Browse Source

Merge branch 'dev-XLsmart' of https://git.cloudwise.com/TSB/euspace into dev-XLsmart-goTls

Tom 7 tháng trước cách đây
mục cha
commit
2332b8ff61

+ 1 - 0
containers/apm_register_app.go

@@ -64,6 +64,7 @@ func (c *Container) RegisterAppInfo(r *Registry, pid uint32) error {
 			CodeType:    c.AppInfo.CodeType.Int(),
 			App_type:    1,
 			AppLang:     "ebpf",
+			Sys:         "",
 		}
 		//klog.Infof("[%sregister app] Register App req: %s.", cl, registerAppReq.String())
 		klog.Infof("[%sregister app] Register App", cl)

+ 5 - 1
containers/container_apm.go

@@ -123,7 +123,11 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 				method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload)
 				ip, _ := netaddr.ParseIP(sn)
 				//codeType := c.GetCodeTypeFromCache(pid)
-				trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo())
+				container_id := ""
+				if c.cgroup != nil {
+					container_id = c.cgroup.ContainerId
+				}
+				trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
 				c.SendEvent(trace, r.TraceId)
 			}
 

+ 1 - 0
dist/aarch64/package_dir/bin/agentctl

@@ -134,6 +134,7 @@ startAgent() {
   if [ -f "${AGENT_BIN_DIR}/${AGENT_PROC}" ]; then
     export DISABLE_E2E_TRACING=false
     export DISABLE_STACK_TRACING=true
+    export RUN_IN_OMNIAGENT=true
     export SEND=1
     local params="--listen=0.0.0.0:8123"
 #    if [ -f "/etc/chaosd/pki/ca.crt" ] && [ -f "/etc/chaosd/pki/chaosd.crt" ] && [ -f "/etc/chaosd/pki/chaosd.key" ]; then

+ 1 - 0
dist/x86_64/package_dir/bin/agentctl

@@ -134,6 +134,7 @@ startAgent() {
   if [ -f "${AGENT_BIN_DIR}/${AGENT_PROC}" ]; then
     export DISABLE_E2E_TRACING=false
     export DISABLE_STACK_TRACING=true
+    export RUN_IN_OMNIAGENT=true
     export SEND=1
     local params="--listen=0.0.0.0:8123"
 #    if [ -f "/etc/chaosd/pki/ca.crt" ] && [ -f "/etc/chaosd/pki/chaosd.crt" ] && [ -f "/etc/chaosd/pki/chaosd.key" ]; then

+ 5 - 3
ebpftracer/ebpf/l7/l7.c

@@ -29,6 +29,7 @@
 #define METHOD_HTTP2_CLIENT_FRAMES  5
 #define METHOD_HTTP2_SERVER_FRAMES  6
 
+#define ERROR_MSG_PAYLOAD_SIZE 128
 #define MAX_PAYLOAD_SIZE 1024 // must be power of 2
 #define TRUNCATE_PAYLOAD_SIZE(size) ({                                  \
     size = MIN(size, MAX_PAYLOAD_SIZE-1);                               \
@@ -95,7 +96,8 @@ struct l7_event {
 	unsigned char app_id_from[APM_APP_ID_SIZE];
 	unsigned char span_id_from[APM_SPAN_ID_SIZE];
     unsigned char type_from[APM_TYPE_FROM_SIZE];
-
+    // 错误消息字段
+    unsigned char error_message[ERROR_MSG_PAYLOAD_SIZE];
 //    __u32 test_id;
     char payload[MAX_PAYLOAD_SIZE];
 } ;
@@ -1013,12 +1015,12 @@ int trace_exit_read_common(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long in
             char resp_combined_packet[5];
             bpf_probe_read(resp_combined_packet,MYSQL_PACKAGE_HEADER_LEN, resp_packet_header);
             bpf_probe_read(&resp_combined_packet[4],1, payload);
-            response = is_mysql_response(resp_combined_packet,sizeof(resp_combined_packet), req->request_type, &e->statement_id, &e->status);
+            response = is_mysql_response(resp_combined_packet,sizeof(resp_combined_packet), req->request_type, &e->statement_id, &e->status, e->error_message);
             if(response) {
                  bpf_map_delete_elem(&active_l7_requests_mysql_resp_header_ctx, &k);
             }
         } else {
-            response = is_mysql_response(payload, ret, req->request_type, &e->statement_id, &e->status);
+            response = is_mysql_response(payload, ret, req->request_type, &e->statement_id, &e->status, e->error_message);
         }
 		if (req->request_type == MYSQL_COM_STMT_PREPARE) {
 			e->method = METHOD_STATEMENT_PREPARE;

+ 26 - 6
ebpftracer/ebpf/l7/mysql.c

@@ -38,7 +38,7 @@ int is_mysql_query(char *buf, __u64 buf_size, __u8 *request_type) {
 }
 
 static __always_inline
-int is_mysql_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *statement_id, __u32 *status) {
+int is_mysql_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *statement_id, __u32 *status,unsigned char *error_message) {
     __u8 b[5];
     bpf_read(buf, b);
     if (b[3] < 1) { // sequence must be > 0
@@ -56,9 +56,29 @@ int is_mysql_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *state
         *status = STATUS_OK;
         return 1;
     }
-    if (b[4] == MYSQL_RESPONSE_ERROR) {
-        *status = STATUS_FAILED;
-        return 1;
-    }
-    return 0;
+	if (b[4] == MYSQL_RESPONSE_ERROR) {
+		// MySQL错误包格式:
+		// packet_length(3) + packet_number(1) + 0xff(1) + error_code(2) + sql_state_marker(1) + sql_state(5) + error_message(variable)
+		__u16 error_code = 0;
+//		char error_message[128] = {};
+		__u8 error_message_len = 0;
+
+		// 读取错误代码(2字节,小端序)
+		if (buf_size >= 7) {
+			bpf_probe_read(&error_code, sizeof(error_code), buf + 5);
+			error_code = bpf_ntohs(error_code); // 转换为主机字节序
+		}
+
+		// 读取错误消息(从第13字节开始,跳过sql_state)
+		if (buf_size > 13) {
+			error_message_len = (__u8) (length - 9); // 总长度 - 头部(4) - 错误码(2) - sql_state(6)
+			bpf_probe_read(error_message, error_message_len, buf + 13);
+			error_message[error_message_len] = '\0'; // 确保字符串结束
+		}
+		// 输出错误信息到内核日志
+		bpf_printk("MySQL Error %d: %s", error_code, error_message);
+		*status = STATUS_FAILED;
+		return 1;
+	}
+	return 0;
 }

+ 2 - 1
ebpftracer/l7/l7.go

@@ -206,6 +206,7 @@ type RequestData struct {
 		InstanceIdFrom string
 		AppIdFrom      string
 		SpanIdFrom     string
-		TypeFrom	   string
+		TypeFrom       string
 	}
+	ErrorMsg string
 }

+ 7 - 5
ebpftracer/tracer.go

@@ -537,12 +537,12 @@ type l7Event struct {
 	EventCount          uint32
 	Sport               uint16
 	Dport               uint16
-	SAddr               [16]byte
-	DAddr               [16]byte
+	SAddr               HashByte16
+	DAddr               HashByte16
 	ComponentSport      uint16
 	ComponentDport      uint16
-	ComponentSAddr      [16]byte
-	ComponentDAddr      [16]byte
+	ComponentSAddr      HashByte16
+	ComponentDAddr      HashByte16
 	AssumedAppId        HashByte
 	SpanId              HashByte
 	TraceIdFrom         HashByte16
@@ -550,7 +550,8 @@ type l7Event struct {
 	InstanceIdFrom      HashByte
 	AppIdFrom           HashByte
 	SpanIdFrom          HashByte
-	TypeFrom          	[1]byte
+	TypeFrom            [1]byte
+	ErrorMsg            HashByte128
 }
 
 type SocketDataBufferddd struct {
@@ -779,6 +780,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				EndAt:          v.EndtAt,
 				ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
 				ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
+				ErrorMsg:       strings.TrimRight(string(v.ErrorMsg[:]), "\x00"),
 			}
 			if req.Protocol == l7.ProtocolHTTP {
 				klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())

+ 5 - 5
flags/flags.go

@@ -39,10 +39,10 @@ var (
 	DisableL7Tracing  = kingpin.Flag("disable-l7-tracing", "Disable L7 tracing").Default("false").Envar("DISABLE_L7_TRACING").Bool()
 
 	ExternalNetworksWhitelist = kingpin.
-					Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
-					Envar("TRACK_PUBLIC_NETWORK").
-					Default("0.0.0.0/0").
-					Strings()
+		Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
+		Envar("TRACK_PUBLIC_NETWORK").
+		Default("0.0.0.0/0").
+		Strings()
 	EphemeralPortRange = kingpin.Flag("ephemeral-port-range", "Destination and Listen TCP ports from this range will be skipped").Default("").Envar("EPHEMERAL_PORT_RANGE").String()
 
 	Provider          = kingpin.Flag("provider", "`provider` label for `node_cloud_info` metric").Envar("PROVIDER").String()
@@ -128,7 +128,7 @@ func init() {
 	// set ServerPrefix
 	// set ConfigServer
 	// set DataServer
-	if *CommonIni != "" {
+	if *RunInOmniagent {
 		iniData, err := ini.Load(*CommonIni)
 		if err == nil && iniData != nil {
 			*ServerPrefix = "/apm"

+ 8 - 6
node/apm_host_info.go

@@ -14,16 +14,18 @@ import (
 )
 
 func NewNodeInfo(name, kv, version string) (*NodeInfoT, error) {
-	ni, err := newNodeInfoFromCommonIni(name, kv, version)
-	if err != nil {
-		klog.Errorf("Failed to create node info from common ini: %v", err)
-	} else {
+	if *flags.RunInOmniagent {
 		klog.Infof("run in omniagent.")
-		*flags.RunInOmniagent = true
-		return ni, err
+		ni, err := newNodeInfoFromCommonIni(name, kv, version)
+		if err != nil {
+			klog.Errorf("Failed to create node info from common ini: %v", err)
+			return nil, err
+		}
+		return ni, nil
 	}
 
 	var ip string
+	var err error
 	if *flags.RunInContainer {
 		klog.Infof("run in container.")
 		ip, err = kube.GetNodeIpByCore(name)

+ 15 - 6
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -64,7 +64,7 @@ type RootDataT struct {
 	MemUP           int           `json:"mem_u_p"`
 	OperType        string        `json:"oper_type"`
 	Parameters      []ParamStruct `json:"parameters,omitempty"`
-	ParentTaskName  int           `json:"parent_task_name"`
+	ParentTaskName  string        `json:"parent_task_name"`
 	Period          int           `json:"period"`
 	RespTime        uint64        `json:"resp_time"`
 	Sampling        int           `json:"sampling"`
@@ -80,10 +80,13 @@ type RootDataT struct {
 	TransIds        []interface{} `json:"trans_ids"`
 	TypeFrom        string        `json:"type_from"`
 	Uri             string        `json:"uri"`
-	UserDir         int           `json:"user_dir"`
+	UserDir         string        `json:"user_dir"`
 	VipIds          []interface{} `json:"vip_ids"`
 	SrcAddr         string        `json:"src_addr"`
 	DestinationAddr string        `json:"destination_addr"`
+	// op 新增字段
+	Pid         uint32 `json:"pid"`
+	ContainerID string `json:"container_id"`
 }
 
 // ParamStruct 定义目标结构
@@ -438,7 +441,7 @@ func initRootDataFromEvent() RootDataT {
 		MemUP:           0,
 		OperType:        "",
 		Parameters:      []ParamStruct{},
-		ParentTaskName:  0,
+		ParentTaskName:  "",
 		Period:          -1,
 		RespTime:        0,
 		Sampling:        0,
@@ -454,7 +457,7 @@ func initRootDataFromEvent() RootDataT {
 		TransIds:        []interface{}{},
 		TypeFrom:        "",
 		Uri:             "",
-		UserDir:         0,
+		UserDir:         "",
 		VipIds:          []interface{}{},
 		SrcAddr:         "",
 		DestinationAddr: "",
@@ -486,7 +489,7 @@ func initRootDataJava() RootDataT {
 		MemUP:          0,
 		OperType:       "",
 		Parameters:     []ParamStruct{},
-		ParentTaskName: 0,
+		ParentTaskName: "",
 		Period:         -1,
 		RespTime:       0,
 		Sampling:       0,
@@ -502,7 +505,7 @@ func initRootDataJava() RootDataT {
 		TransIds:       []interface{}{},
 		TypeFrom:       "",
 		Uri:            "",
-		UserDir:        0,
+		UserDir:        "",
 		VipIds:         []interface{}{},
 	}
 	return data
@@ -769,6 +772,10 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 			traceRoot.SrcAddr = attr.Value.AsString()
 		case "server.dst_addr":
 			traceRoot.DestinationAddr = attr.Value.AsString()
+		case "server.pid":
+			traceRoot.Pid = uint32(attr.Value.AsInt64())
+		case "server.container_id":
+			traceRoot.ContainerID = attr.Value.AsString()
 		}
 	}
 	traceRoot.Maps = append(traceRoot.Maps, mNode)
@@ -893,6 +900,8 @@ func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 			} else {
 				mNode.Exception = 0
 			}
+		case "sql.exception_msg":
+			mNode.ExceptionMsg = attr.Value.AsString()
 		case "sql.src_addr":
 			mNode.SrcAddr = attr.Value.AsString()
 		case "sql.destination_addr":

+ 3 - 1
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/client_apm.go

@@ -5,6 +5,7 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/coroot/coroot-node-agent/utils"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	"github.com/klauspost/compress/zstd"
 	klog "github.com/sirupsen/logrus"
@@ -116,7 +117,8 @@ func (d *client) newApmRequest(body []byte, mapLen int, codeType CodeType) (requ
 	r.Header.Set("routingKey", codeType.Topic())
 	klog.Debugln(codeType.Topic())
 	r.Header.Set("DataCount", strconv.Itoa(mapLen))
-
+	// 对接op
+	r.Header.Set("AccountId", strconv.Itoa(utils.GetAccountID()))
 	req := request{Request: r}
 	// icase := ZstdCompression
 	switch Compression(d.cfg.Compression) {

+ 8 - 1
tracing/apm_tracing.go

@@ -129,7 +129,7 @@ func (t *Trace) AllEventReady(traceID uint64) bool {
 	return t.startEventReady && t.endEventReady && *t.currenEventCount >= t.needEventCount
 }
 
-func (t *Trace) TraceStartEvent(method, path, sn string, sport uint16, status l7.Status, addr netaddr.IPPort, pid uint32, appInfo AppInfo) {
+func (t *Trace) TraceStartEvent(method, path, sn string, sport uint16, status l7.Status, addr netaddr.IPPort, pid uint32, appInfo AppInfo, container_id string) {
 	t.span.SetAttributes(semconv.HTTPURL(fmt.Sprintf("http://%s:%d%s", sn, sport, path)),
 		semconv.HTTPMethod(method),
 		attribute.String("http.uri", path))
@@ -148,6 +148,8 @@ func (t *Trace) TraceStartEvent(method, path, sn string, sport uint16, status l7
 		attribute.Int64("server.app_id", appInfo.AppIdHash.IntVal),
 		attribute.Int64("server.agent_id", appInfo.AgentId),
 		attribute.Int64("server.instance_id", appInfo.InstanceIdHash.IntVal),
+		attribute.Int("server.pid", int(pid)),
+		attribute.String("server.container_id", container_id),
 	}
 	t.span.SetAttributes(t.commonAttrs...)
 	t.startReady()
@@ -419,6 +421,11 @@ func (t *Trace) SQLTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.KeyV
 	attr = append(attr,
 		attribute.Bool("sql.exception", r.Status.Error()),
 	)
+	if r.Status.Error() {
+		attr = append(attr,
+			attribute.String("sql.exception_msg", r.ErrorMsg),
+		)
+	}
 	t.createTraceEvent(l7Type.String(), ebpftracer.EventTypeL7Request.Int(), l7Type.Int(), attr...)
 }
 

+ 5 - 2
utils/modelse/id.go

@@ -3,8 +3,9 @@ package modelse
 import "strconv"
 
 const (
-	HASH_SIZE_8  = 8
-	HASH_SIZE_16 = 16
+	HASH_SIZE_8   = 8
+	HASH_SIZE_16  = 16
+	HASH_SIZE_128 = 128
 )
 
 // apm_span_context
@@ -23,6 +24,8 @@ type HashByte [HASH_SIZE_8]byte
 
 type HashByte16 [HASH_SIZE_16]byte
 
+type HashByte128 [HASH_SIZE_128]byte
+
 type INT_HASH_ID struct {
 	IntVal   int64
 	HashtVal HashByte

+ 1 - 0
utils/modelse/models.go

@@ -129,6 +129,7 @@ type RegisterAppReq struct {
 	App_type    int    `json:"app_type"`
 	HostId      int64  `json:"hostId"`
 	AppLang     string `json:"app_lang"`
+	Sys         string `json:"sys"`
 	//AppNameAnalysis []map[string]string `json:"app_name_analysis"`
 	//CwAppTransform  string              `json:"cw_app_transform"`
 	//OrgCode         string              `json:"org_code"`