client_apm.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package otlptracehttp // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. . "github.com/coroot/coroot-node-agent/utils/modelse"
  9. "github.com/klauspost/compress/zstd"
  10. klog "github.com/sirupsen/logrus"
  11. "go.opentelemetry.io/otel"
  12. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  13. "io"
  14. "net/http"
  15. "net/url"
  16. "strconv"
  17. )
  18. func (d *client) UploadApmTraces(ctx context.Context, rootData []otlptrace.RootDataT, codeType int) error {
  19. //pbRequest := &coltracepb.ExportTraceServiceRequest{
  20. // ResourceSpans: protoSpans,
  21. //}
  22. klog.Infoln("enter the UploadApmTraces newApmRequest-----")
  23. rawRequest, err := json.Marshal(rootData)
  24. if err != nil {
  25. return err
  26. }
  27. ctx, cancel := d.contextWithStop(ctx)
  28. defer cancel()
  29. mapLen := len(rootData)
  30. klog.Infoln("enter the UploadApmTraces newApmRequest")
  31. request, err := d.newApmRequest(rawRequest, mapLen, CodeType(codeType))
  32. if err != nil {
  33. return err
  34. }
  35. return d.requestFunc(ctx, func(ctx context.Context) error {
  36. select {
  37. case <-ctx.Done():
  38. return ctx.Err()
  39. default:
  40. }
  41. request.reset(ctx)
  42. resp, err := d.client.Do(request.Request)
  43. if err != nil {
  44. return err
  45. }
  46. if resp != nil && resp.Body != nil {
  47. defer func() {
  48. if err := resp.Body.Close(); err != nil {
  49. otel.Handle(err)
  50. }
  51. }()
  52. }
  53. switch sc := resp.StatusCode; {
  54. case sc >= 200 && sc <= 299:
  55. // Success, do not retry.
  56. // Read the partial success message, if any.
  57. var respData bytes.Buffer
  58. if _, err := io.Copy(&respData, resp.Body); err != nil {
  59. return err
  60. }
  61. if respData.Len() != 0 {
  62. var respJsonData RespDataT
  63. if err := json.Unmarshal(respData.Bytes(), &respJsonData); err != nil {
  64. return err
  65. }
  66. msg := respJsonData.Msg
  67. code := respJsonData.Code
  68. if msg != "send ok" || code != 1000 {
  69. return fmt.Errorf("resp error msg:<%s> code:<%d>", msg, code)
  70. }
  71. }
  72. return nil
  73. case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable:
  74. // Retry-able failures. Drain the body to reuse the connection.
  75. if _, err := io.Copy(io.Discard, resp.Body); err != nil {
  76. otel.Handle(err)
  77. }
  78. return newResponseError(resp.Header)
  79. default:
  80. return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status)
  81. }
  82. })
  83. }
  84. type RespDataT struct {
  85. Code int `json:"code"`
  86. Msg string `json:"message"`
  87. }
  88. func (d *client) newApmRequest(body []byte, mapLen int, codeType CodeType) (request, error) {
  89. u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath}
  90. r, err := http.NewRequest(http.MethodPost, u.String(), nil)
  91. if err != nil {
  92. return request{Request: r}, err
  93. }
  94. userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
  95. r.Header.Set("User-Agent", userAgent)
  96. for k, v := range d.cfg.Headers {
  97. r.Header.Set(k, v)
  98. }
  99. //r.Header.Set("Content-Type", contentTypeProto)
  100. r.Header.Set("Content-Type", "text/plain;charset=utf-8")
  101. r.Header.Set("routingKey", codeType.Topic())
  102. klog.Infoln(codeType.Topic())
  103. r.Header.Set("DataCount", strconv.Itoa(mapLen))
  104. req := request{Request: r}
  105. // icase := ZstdCompression
  106. switch Compression(d.cfg.Compression) {
  107. // switch Compression(icase) {
  108. case NoCompression:
  109. r.ContentLength = (int64)(len(body))
  110. req.bodyReader = bodyReader(body)
  111. klog.Infoln("enter the NoCompression newApmRequest")
  112. case GzipCompression:
  113. klog.Infoln("enter the GzipCompression newApmRequest")
  114. // Ensure the content length is not used.
  115. r.ContentLength = -1
  116. r.Header.Set("Content-Encoding", "gzip")
  117. gz := gzPool.Get().(*gzip.Writer)
  118. defer gzPool.Put(gz)
  119. var b bytes.Buffer
  120. gz.Reset(&b)
  121. if _, err := gz.Write(body); err != nil {
  122. return req, err
  123. }
  124. // Close needs to be called to ensure body if fully written.
  125. if err := gz.Close(); err != nil {
  126. return req, err
  127. }
  128. req.bodyReader = bodyReader(b.Bytes())
  129. case ZstdCompression:
  130. klog.Infoln("enter the ZstdCompression newApmRequest")
  131. r.ContentLength = -1
  132. r.Header.Set("Content-Encoding", "zstd")
  133. encoder, err := zstd.NewWriter(nil)
  134. if err != nil {
  135. return req, err
  136. }
  137. defer encoder.Close()
  138. compressedData := encoder.EncodeAll(body, make([]byte, 0, len(body)))
  139. req.bodyReader = bodyReader(compressedData)
  140. }
  141. return req, nil
  142. }