package otlptracehttp // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" import ( "bytes" "compress/gzip" "context" "encoding/json" "fmt" . "github.com/coroot/coroot-node-agent/utils/modelse" "github.com/klauspost/compress/zstd" klog "github.com/sirupsen/logrus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "io" "net/http" "net/url" "strconv" ) func (d *client) UploadApmTraces(ctx context.Context, rootData []otlptrace.RootDataT, codeType int) error { //pbRequest := &coltracepb.ExportTraceServiceRequest{ // ResourceSpans: protoSpans, //} //klog.Infoln("enter the UploadApmTraces newApmRequest-----") rawRequest, err := json.Marshal(rootData) if err != nil { return err } ctx, cancel := d.contextWithStop(ctx) defer cancel() mapLen := len(rootData) klog.Debugln("enter the UploadApmTraces newApmRequest") request, err := d.newApmRequest(rawRequest, mapLen, CodeType(codeType)) if err != nil { return err } return d.requestFunc(ctx, func(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } request.reset(ctx) resp, err := d.client.Do(request.Request) if err != nil { return err } if resp != nil && resp.Body != nil { defer func() { if err := resp.Body.Close(); err != nil { otel.Handle(err) } }() } switch sc := resp.StatusCode; { case sc >= 200 && sc <= 299: // Success, do not retry. // Read the partial success message, if any. var respData bytes.Buffer if _, err := io.Copy(&respData, resp.Body); err != nil { return err } if respData.Len() != 0 { var respJsonData RespDataT if err := json.Unmarshal(respData.Bytes(), &respJsonData); err != nil { return err } msg := respJsonData.Msg code := respJsonData.Code klog.Infof("data/receive response body: %s\n", respData.String()) if msg != "send ok" || code != 1000 { return fmt.Errorf("resp error msg:<%s> code:<%d>", msg, code) } } return nil case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable: // Retry-able failures. Drain the body to reuse the connection. if _, err := io.Copy(io.Discard, resp.Body); err != nil { otel.Handle(err) } return newResponseError(resp.Header) default: return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status) } }) } type RespDataT struct { Code int `json:"code"` Msg string `json:"message"` } func (d *client) newApmRequest(body []byte, mapLen int, codeType CodeType) (request, error) { u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath} r, err := http.NewRequest(http.MethodPost, u.String(), nil) if err != nil { return request{Request: r}, err } userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version() r.Header.Set("User-Agent", userAgent) for k, v := range d.cfg.Headers { r.Header.Set(k, v) } //r.Header.Set("Content-Type", contentTypeProto) r.Header.Set("Content-Type", "text/plain;charset=utf-8") r.Header.Set("routingKey", codeType.Topic()) klog.Debugln(codeType.Topic()) r.Header.Set("DataCount", strconv.Itoa(mapLen)) req := request{Request: r} // icase := ZstdCompression switch Compression(d.cfg.Compression) { // switch Compression(icase) { case NoCompression: r.ContentLength = (int64)(len(body)) req.bodyReader = bodyReader(body) klog.Infoln("enter the NoCompression newApmRequest") case GzipCompression: klog.Infoln("enter the GzipCompression newApmRequest") // Ensure the content length is not used. r.ContentLength = -1 r.Header.Set("Content-Encoding", "gzip") gz := gzPool.Get().(*gzip.Writer) defer gzPool.Put(gz) var b bytes.Buffer gz.Reset(&b) if _, err := gz.Write(body); err != nil { return req, err } // Close needs to be called to ensure body if fully written. if err := gz.Close(); err != nil { return req, err } req.bodyReader = bodyReader(b.Bytes()) case ZstdCompression: klog.Debugln("enter the ZstdCompression newApmRequest") r.ContentLength = -1 r.Header.Set("Content-Encoding", "zstd") encoder, err := zstd.NewWriter(nil) if err != nil { return req, err } defer encoder.Close() compressedData := encoder.EncodeAll(body, make([]byte, 0, len(body))) req.bodyReader = bodyReader(compressedData) } return req, nil }