Prechádzať zdrojové kódy

Feature #TASK_QT-18250 General CWOTHER add ctrl

Carl 2 mesiacov pred
rodič
commit
cac9e6545c

+ 6 - 2
containers/container_apm.go

@@ -166,6 +166,10 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			if err == nil {
 				switch r.Protocol {
 				case l7.ProtocolHTTP:
+					// cwother
+					if !c.AppInfo.CodeType.IsGoCode() {
+						r.SysvcFrom = l7.ParseHttpSysvcFrom(r.Payload)
+					}
 					method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
 					// userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
 					ip, _ := netaddr.ParseIP(sn)
@@ -174,7 +178,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 					if c.cgroup != nil {
 						container_id = c.cgroup.ContainerId
 					}
-					trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
+					trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
 					c.SendEvent(trace, r.TraceId)
 				case l7.ProtocolGrpc:
 					// gRPC
@@ -361,7 +365,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	 * Mysql
 	 */
 	case l7.ProtocolMysql:
-		fmt.Println("mysql r.TraceId:", r.TraceId)
+		//fmt.Println("mysql r.TraceId:", r.TraceId)
 		if r.Method != l7.MethodStatementClose {
 			stats.observe(r.Status.String(), "", r.Duration)
 		}

+ 14 - 4
ebpftracer/ebpf/l7/sk_msg.c

@@ -26,6 +26,14 @@ struct {
 	__type(value, __u32); // placeholder
 } sk_msg_map SEC(".maps");
 
+// cwother 开关:key=0, value=1 启用,0 或不存在则跳过
+struct {
+	__uint(type, BPF_MAP_TYPE_ARRAY);
+	__type(key, __u32);
+	__type(value, __u8);
+	__uint(max_entries, 1);
+} l4_cwother_enabled SEC(".maps");
+
 //SEC("sk_msg")
 //int sk_msg_handler(struct sk_msg_md *msg)
 //{
@@ -265,7 +273,9 @@ int sk_msg_handler(struct sk_msg_md *msg)
 	// cwtrace 写完后,在 header_stream 后面拼接 cwother
 	__u32 ins_len = CW_STREAM_HEADER_LEN - 1; // cwtrace 内容长度(不含 '\0')= 134
 
-	if (proc_info->sysvc[0] != '\0') {
+	__u32 _k0 = 0;
+	__u8 *cwother_flag = bpf_map_lookup_elem(&l4_cwother_enabled, &_k0);
+	if (cwother_flag && *cwother_flag && proc_info->sysvc[0] != '\0') {
 		// 计算 sysvc 实际长度
 		__u32 sys_val_len = 0;
 #pragma clang loop unroll(full)
@@ -288,14 +298,14 @@ int sk_msg_handler(struct sk_msg_md *msg)
 			map_data->header_stream[off + 1] = '\n';
 			off += 2;
 			ins_len = off;
-			bpf_printk("sk_msg cwother: ins_len=%u sys_val_len=%u\n", ins_len, sys_val_len);
+//			bpf_printk("sk_msg cwother: ins_len=%u sys_val_len=%u\n", ins_len, sys_val_len);
 		}
 	}
 
 	__u32 rip = __builtin_bswap32(msg->remote_ip4); // host order
 	__u16 rport = __builtin_bswap32(msg->remote_port);
 	__u16 lport = (__u16)(msg->local_port); // host order
-	bpf_printk("sk_msg: rport=%u lport=%u ins_len=%u\n", rport, lport, ins_len);
+//	bpf_printk("sk_msg: rport=%u lport=%u ins_len=%u\n", rport, lport, ins_len);
 
 	__u32 header_offset = map_data->header_offset_idx;
 	if (header_offset > msg->size){
@@ -371,7 +381,7 @@ int sk_msg_handler(struct sk_msg_md *msg)
 	}
 
 	// save span context
-	bpf_printk("sk_msg save: rip=0x%x rport=%u lport=%u\n", rip, rport, lport);
+//	bpf_printk("sk_msg save: rip=0x%x rport=%u lport=%u\n", rip, rport, lport);
 	cw_save_current_span_context_by_ipport(rip, rport, lport, cw_sc);
 
 	return SK_PASS;

+ 10 - 0
ebpftracer/l7/http.go

@@ -109,6 +109,16 @@ func ParseHttpHostWithUserAgent(payload []byte, isTls bool) (string, string, str
 	return method, uri, host, port, userAgent
 }
 
+// ParseHttpSysvcFrom 解析 HTTP 请求中的 cwother 头部,兼容常见大小写写法。
+func ParseHttpSysvcFrom(payload []byte) string {
+	for _, headerName := range []string{"CWOTHER:", "Cwother:", "cwother:"} {
+		if value := parseHttpHeader(payload, headerName); value != "" {
+			return value
+		}
+	}
+	return ""
+}
+
 // ParseElasticsearch 解析 Elasticsearch HTTP 请求体
 // 只返回请求体(Query)部分
 func ParseElasticsearch(payload []byte) string {

+ 15 - 0
ebpftracer/tracer/socket.go

@@ -58,6 +58,9 @@ func MapInsert(collection *ebpf.Collection) {
 
 	// 从flags配置获取CodeType配置,默认支持Python和Node
 	update_skmsg_header_allowed_to_map(collection)
+
+	// 设置 cwother header 开关
+	update_l4_cwother_enabled(collection)
 }
 
 func insert_output_prog_to_map(collection *ebpf.Collection) {
@@ -412,6 +415,18 @@ func parseCodeTypesFromFlags() []CodeType {
 	return codeTypes
 }
 
+func update_l4_cwother_enabled(collection *ebpf.Collection) {
+	var val uint8
+	if !*flags.DisableL4Cwother {
+		val = 1
+	}
+	_, err := bpf_table_set_any_value(collection, MAP_L4_CWOTHER_ENABLED_NAME, uint32(0), val)
+	if err != nil {
+		klog.Errorf("[kernel] update_l4_cwother_enabled err: %v", err)
+	}
+	klog.Infof("[kernel] set l4_cwother_enabled = %d", val)
+}
+
 func update_skmsg_header_allowed_to_map(collection *ebpf.Collection) {
 	codeTypes := parseCodeTypesFromFlags()
 	klog.Infof("[kernel] set l4 header allowed_to_map %s", codeTypes)

+ 5 - 4
flags/flags.go

@@ -40,10 +40,10 @@ var (
 	EnableElasticsearchDetection = kingpin.Flag("enable-es", "Enable Elasticsearch detection in HTTP requests").Default("false").Envar("ENABLE_ES").Bool()
 
 	ExternalNetworksWhitelist = kingpin.
-					Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
-					Envar("TRACK_PUBLIC_NETWORK").
-					Default("0.0.0.0/0").
-					Strings()
+		Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
+		Envar("TRACK_PUBLIC_NETWORK").
+		Default("0.0.0.0/0").
+		Strings()
 	EphemeralPortRange = kingpin.Flag("ephemeral-port-range", "Destination and Listen TCP ports from this range will be skipped").Default("").Envar("EPHEMERAL_PORT_RANGE").String()
 
 	Provider          = kingpin.Flag("provider", "`provider` label for `node_cloud_info` metric").Envar("PROVIDER").String()
@@ -69,6 +69,7 @@ var (
 	HostDirPathPrefix = kingpin.Flag("host-dir-path-prefix", "Set the prefix of path about the mount point of the host directory").Envar("HOST_DIR_PATH_PREFIX").Default("").String()
 	FuseTryMax        = kingpin.Flag("fuse_try_max", "The maximum number of the fuse operation try").Default("3").Envar("FUSE_TRY_MAX").Int()
 	L4HeaderCodeTypes = kingpin.Flag("l4-header", "L4 header code types (e.g., python,node,ruby)").Envar("L4_HEADER_CODE_TYPES").Default("python,node").String()
+	DisableL4Cwother  = kingpin.Flag("disable-l4-cwother", "Disable L4 cwother header injection").Envar("DISABLE_L4_CWOTHER").Default("false").Bool()
 	// debug
 	Test = kingpin.Flag("test", "Only test").Default("false").Envar("TEST").Bool()
 	// op 新增配置

+ 7 - 2
tracing/apm_tracing.go

@@ -177,11 +177,11 @@ func (t *Trace) MQConsumerTraceStartEvent(sn string, sport uint16, r *l7.Request
 	t.startReady()
 }
 
-func (t *Trace) TraceStartEvent(method, path, sn, ua string, sport uint16, status l7.Status, addr netaddr.IPPort, pid uint32, appInfo AppInfo, container_id string) {
+func (t *Trace) TraceStartEvent(method, path, sn, ua string, sport uint16, r *l7.RequestData, addr netaddr.IPPort, pid uint32, appInfo AppInfo, container_id string) {
 	t.span.SetAttributes(semconv.HTTPURL(fmt.Sprintf("http://%s:%d%s", sn, sport, path)),
 		semconv.HTTPMethod(method),
 		attribute.String("http.uri", path))
-	if status > 399 {
+	if r.Status > 399 {
 		t.span.SetStatus(codes.Error, "")
 	}
 	//host := fmt.Sprintf("%s:%s", sn, sport)
@@ -200,6 +200,11 @@ func (t *Trace) TraceStartEvent(method, path, sn, ua string, sport uint16, statu
 		attribute.String("server.container_id", container_id),
 		attribute.String("server.user_agent", ua),
 	}
+
+	if r.SysvcFrom != "" {
+		t.commonAttrs = append(t.commonAttrs, attribute.String("server.sysvc_from", r.SysvcFrom))
+	}
+
 	t.span.SetAttributes(t.commonAttrs...)
 	t.startReady()
 }

+ 1 - 0
utils/modelse/bpf_struct.go

@@ -27,6 +27,7 @@ const (
 
 	MAP_PROC_INFO_MAP_NAME        = "proc_info_map"
 	MAP_L4_HEADER_CODE_TYPES_NAME = "l4_header_code_types"
+	MAP_L4_CWOTHER_ENABLED_NAME   = "l4_cwother_enabled"
 	// This prog is designed to handle data transfer
 	PROGUP                         = "bpf_prog_up__"
 	PROGKP                         = "bpf_prog_kp__"