package otlptracehttp // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" import ( "bytes" "compress/gzip" "context" "encoding/json" "fmt" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "io" "net/http" "net/url" "strconv" ) func (d *client) UploadApmTraces(ctx context.Context, rootData []otlptrace.RootData) error { //pbRequest := &coltracepb.ExportTraceServiceRequest{ // ResourceSpans: protoSpans, //} rawRequest, err := json.Marshal(rootData) if err != nil { return err } ctx, cancel := d.contextWithStop(ctx) defer cancel() mapLen := len(rootData) request, err := d.newApmRequest(rawRequest, mapLen) if err != nil { return err } return d.requestFunc(ctx, func(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } request.reset(ctx) resp, err := d.client.Do(request.Request) if err != nil { return err } if resp != nil && resp.Body != nil { defer func() { if err := resp.Body.Close(); err != nil { otel.Handle(err) } }() } switch sc := resp.StatusCode; { case sc >= 200 && sc <= 299: // Success, do not retry. // Read the partial success message, if any. var respData bytes.Buffer if _, err := io.Copy(&respData, resp.Body); err != nil { return err } if respData.Len() != 0 { var respJsonData RespDataT if err := json.Unmarshal(respData.Bytes(), &respJsonData); err != nil { return err } msg := respJsonData.Msg code := respJsonData.Code if msg != "send ok" || code != 1000 { return fmt.Errorf("resp error msg:<%s> code:<%d>", msg, code) } } return nil case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable: // Retry-able failures. Drain the body to reuse the connection. if _, err := io.Copy(io.Discard, resp.Body); err != nil { otel.Handle(err) } return newResponseError(resp.Header) default: return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status) } }) } type RespDataT struct { Code int `json:"code"` Msg string `json:"message"` } func (d *client) newApmRequest(body []byte, mapLen int) (request, error) { u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath} r, err := http.NewRequest(http.MethodPost, u.String(), nil) if err != nil { return request{Request: r}, err } userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version() r.Header.Set("User-Agent", userAgent) for k, v := range d.cfg.Headers { r.Header.Set(k, v) } //r.Header.Set("Content-Type", contentTypeProto) r.Header.Set("Content-Type", "text/plain;charset=utf-8") r.Header.Set("routingKey", "goTopic") r.Header.Set("DataCount", strconv.Itoa(mapLen)) req := request{Request: r} switch Compression(d.cfg.Compression) { case NoCompression: r.ContentLength = (int64)(len(body)) req.bodyReader = bodyReader(body) case GzipCompression: // Ensure the content length is not used. r.ContentLength = -1 r.Header.Set("Content-Encoding", "gzip") gz := gzPool.Get().(*gzip.Writer) defer gzPool.Put(gz) var b bytes.Buffer gz.Reset(&b) if _, err := gz.Write(body); err != nil { return req, err } // Close needs to be called to ensure body if fully written. if err := gz.Close(); err != nil { return req, err } req.bodyReader = bodyReader(b.Bytes()) } return req, nil }