| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- package otlptracehttp // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
- import (
- "bytes"
- "compress/gzip"
- "context"
- "encoding/json"
- "fmt"
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
- "io"
- "net/http"
- "net/url"
- "os"
- "strconv"
- )
- func (d *client) UploadApmTraces(ctx context.Context, rootData []otlptrace.RootDataT) error {
- //pbRequest := &coltracepb.ExportTraceServiceRequest{
- // ResourceSpans: protoSpans,
- //}
- rawRequest, err := json.Marshal(rootData)
- if err != nil {
- return err
- }
- ctx, cancel := d.contextWithStop(ctx)
- defer cancel()
- mapLen := len(rootData)
- request, err := d.newApmRequest(rawRequest, mapLen)
- if err != nil {
- return err
- }
- return d.requestFunc(ctx, func(ctx context.Context) error {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
- request.reset(ctx)
- resp, err := d.client.Do(request.Request)
- if err != nil {
- return err
- }
- if resp != nil && resp.Body != nil {
- defer func() {
- if err := resp.Body.Close(); err != nil {
- otel.Handle(err)
- }
- }()
- }
- switch sc := resp.StatusCode; {
- case sc >= 200 && sc <= 299:
- // Success, do not retry.
- // Read the partial success message, if any.
- var respData bytes.Buffer
- if _, err := io.Copy(&respData, resp.Body); err != nil {
- return err
- }
- if respData.Len() != 0 {
- var respJsonData RespDataT
- if err := json.Unmarshal(respData.Bytes(), &respJsonData); err != nil {
- return err
- }
- msg := respJsonData.Msg
- code := respJsonData.Code
- if msg != "send ok" || code != 1000 {
- return fmt.Errorf("resp error msg:<%s> code:<%d>", msg, code)
- }
- }
- return nil
- case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable:
- // Retry-able failures. Drain the body to reuse the connection.
- if _, err := io.Copy(io.Discard, resp.Body); err != nil {
- otel.Handle(err)
- }
- return newResponseError(resp.Header)
- default:
- return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status)
- }
- })
- }
- type RespDataT struct {
- Code int `json:"code"`
- Msg string `json:"message"`
- }
- func (d *client) newApmRequest(body []byte, mapLen int) (request, error) {
- u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath}
- r, err := http.NewRequest(http.MethodPost, u.String(), nil)
- if err != nil {
- return request{Request: r}, err
- }
- userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
- r.Header.Set("User-Agent", userAgent)
- for k, v := range d.cfg.Headers {
- r.Header.Set(k, v)
- }
- //r.Header.Set("Content-Type", contentTypeProto)
- r.Header.Set("Content-Type", "text/plain;charset=utf-8")
- if os.Getenv("JAVA") == "1" {
- r.Header.Set("routingKey", "javaTopic")
- } else {
- r.Header.Set("routingKey", "goTopic")
- }
- r.Header.Set("DataCount", strconv.Itoa(mapLen))
- req := request{Request: r}
- switch Compression(d.cfg.Compression) {
- case NoCompression:
- r.ContentLength = (int64)(len(body))
- req.bodyReader = bodyReader(body)
- case GzipCompression:
- // Ensure the content length is not used.
- r.ContentLength = -1
- r.Header.Set("Content-Encoding", "gzip")
- gz := gzPool.Get().(*gzip.Writer)
- defer gzPool.Put(gz)
- var b bytes.Buffer
- gz.Reset(&b)
- if _, err := gz.Write(body); err != nil {
- return req, err
- }
- // Close needs to be called to ensure body if fully written.
- if err := gz.Close(); err != nil {
- return req, err
- }
- req.bodyReader = bodyReader(b.Bytes())
- }
- return req, nil
- }
|