client_apm.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package otlptracehttp // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "go.opentelemetry.io/otel"
  9. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  10. "io"
  11. "net/http"
  12. "net/url"
  13. "strconv"
  14. )
  15. func (d *client) UploadApmTraces(ctx context.Context, rootData []otlptrace.RootData) error {
  16. //pbRequest := &coltracepb.ExportTraceServiceRequest{
  17. // ResourceSpans: protoSpans,
  18. //}
  19. rawRequest, err := json.Marshal(rootData)
  20. if err != nil {
  21. return err
  22. }
  23. ctx, cancel := d.contextWithStop(ctx)
  24. defer cancel()
  25. mapLen := len(rootData)
  26. request, err := d.newApmRequest(rawRequest, mapLen)
  27. if err != nil {
  28. return err
  29. }
  30. return d.requestFunc(ctx, func(ctx context.Context) error {
  31. select {
  32. case <-ctx.Done():
  33. return ctx.Err()
  34. default:
  35. }
  36. request.reset(ctx)
  37. resp, err := d.client.Do(request.Request)
  38. if err != nil {
  39. return err
  40. }
  41. if resp != nil && resp.Body != nil {
  42. defer func() {
  43. if err := resp.Body.Close(); err != nil {
  44. otel.Handle(err)
  45. }
  46. }()
  47. }
  48. switch sc := resp.StatusCode; {
  49. case sc >= 200 && sc <= 299:
  50. // Success, do not retry.
  51. // Read the partial success message, if any.
  52. var respData bytes.Buffer
  53. if _, err := io.Copy(&respData, resp.Body); err != nil {
  54. return err
  55. }
  56. if respData.Len() != 0 {
  57. var respJsonData RespDataT
  58. if err := json.Unmarshal(respData.Bytes(), &respJsonData); err != nil {
  59. return err
  60. }
  61. msg := respJsonData.Msg
  62. code := respJsonData.Code
  63. if msg != "send ok" || code != 1000 {
  64. return fmt.Errorf("resp error msg:<%s> code:<%d>", msg, code)
  65. }
  66. }
  67. return nil
  68. case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable:
  69. // Retry-able failures. Drain the body to reuse the connection.
  70. if _, err := io.Copy(io.Discard, resp.Body); err != nil {
  71. otel.Handle(err)
  72. }
  73. return newResponseError(resp.Header)
  74. default:
  75. return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status)
  76. }
  77. })
  78. }
  79. type RespDataT struct {
  80. Code int `json:"code"`
  81. Msg string `json:"message"`
  82. }
  83. func (d *client) newApmRequest(body []byte, mapLen int) (request, error) {
  84. u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath}
  85. r, err := http.NewRequest(http.MethodPost, u.String(), nil)
  86. if err != nil {
  87. return request{Request: r}, err
  88. }
  89. userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
  90. r.Header.Set("User-Agent", userAgent)
  91. for k, v := range d.cfg.Headers {
  92. r.Header.Set(k, v)
  93. }
  94. //r.Header.Set("Content-Type", contentTypeProto)
  95. r.Header.Set("Content-Type", "text/plain;charset=utf-8")
  96. r.Header.Set("routingKey", "goTopic")
  97. r.Header.Set("DataCount", strconv.Itoa(mapLen))
  98. req := request{Request: r}
  99. switch Compression(d.cfg.Compression) {
  100. case NoCompression:
  101. r.ContentLength = (int64)(len(body))
  102. req.bodyReader = bodyReader(body)
  103. case GzipCompression:
  104. // Ensure the content length is not used.
  105. r.ContentLength = -1
  106. r.Header.Set("Content-Encoding", "gzip")
  107. gz := gzPool.Get().(*gzip.Writer)
  108. defer gzPool.Put(gz)
  109. var b bytes.Buffer
  110. gz.Reset(&b)
  111. if _, err := gz.Write(body); err != nil {
  112. return req, err
  113. }
  114. // Close needs to be called to ensure body if fully written.
  115. if err := gz.Close(); err != nil {
  116. return req, err
  117. }
  118. req.bodyReader = bodyReader(b.Bytes())
  119. }
  120. return req, nil
  121. }