Просмотр исходного кода

Feature #TASK_QT-18250 header

Feature #TASK_QT-18250 header

Feature #TASK_QT-18250 header

Feature #TASK_QT-18250 header

Feature #TASK_QT-18250 header

Feature #TASK_QT-18250 header2

Feature #TASK_QT-18250 header2

Feature #TASK_QT-18250 header2
Carl 2 месяцев назад
Родитель
Сommit
bb55775856

+ 11 - 0
ebpftracer/ebpf/include/apm_trace.h

@@ -46,6 +46,12 @@ struct apm_trace_key_t {
 	__u64 connectid;
 };
 
+struct apm_sk_msg_key_t {
+	__u32 tgid;
+	__u32 remote_ip4 ;
+	__u16 remote_port;
+};
+
 struct fd_trace_key_t {
 	__u32 tgid;
 	__u32 fd;
@@ -104,5 +110,10 @@ static __always_inline void cw_copy_byte_arrays(unsigned char *src, unsigned cha
 		dst[i] = src[i];
 	}
 }
+static __inline __u32 has_cw_header(const char *data);
+static __always_inline void copy_byte_arrays(unsigned char *src, unsigned char *dst, __u32 size);
+static __always_inline void generate_random_bytes(unsigned char *buff, __u32 size);
+static __always_inline void hex_string_to_bytes(char *str, __u32 size, unsigned char *out);
+static __always_inline void span_context_to_cw_string_stream(struct apm_span_context *ctx, char *buff, char type_from);
 
 #endif //EUSPACES_APM_TRACE_H

+ 12 - 0
ebpftracer/ebpf/include/bpf_base.h

@@ -56,6 +56,18 @@ static long (*bpf_perf_event_output) (void *ctx, void *map, __u64 flags,
 				      void *data, __u64 size) = (void *)25;
 static long (*bpf_probe_read_str) (void *dst, __u32 size,
 				   const void *unsafe_ptr) = (void *)45;
+
+static long (*bpf_skb_load_bytes)(const void *skb, __u32 offset, void *to, __u32 len) = (void *) 26;
+static long (*bpf_skb_adjust_room)(struct __sk_buff *skb, __s32 len_diff, __u32 mode, __u64 flags) = (void *) 50;
+static long (*bpf_skb_store_bytes)(struct __sk_buff *skb, __u32 offset, const void *from, __u32 len, __u64 flags) = (void *) 9;
+static __u64 (*bpf_get_socket_cookie)(void *ctx) = (void *) 46;
+static long (*bpf_sock_hash_update)(struct bpf_sock_ops *skops, void *map, void *key, __u64 flags) = (void *) 70;
+static long (*bpf_probe_read_kernel)(void *dst, __u32 size, const void *unsafe_ptr) = (void *) 113;
+static long (*bpf_sock_map_update)(struct bpf_sock_ops *skops, void *map, void *key, __u64 flags) = (void *) 53;
+static long (*bpf_msg_pull_data)(struct sk_msg_md *msg, __u32 start, __u32 end, __u64 flags) = (void *) 63;
+static long (*bpf_msg_push_data)(struct sk_msg_md *msg, __u32 start, __u32 len, __u64 flags) = (void *) 90;
+static long (*bpf_msg_apply_bytes)(struct sk_msg_md *msg, __u32 bytes) = (void *) 61;
+
 // bpf_probe_read_user added in Linux 5.5, Instead of bpf_probe_read_user(), use bpf_probe_read() here.
 static long (*bpf_probe_read_user) (void *dst, __u32 size, const void *unsafe_ptr) = (void *)4;	// real value is 112
 

+ 2 - 1
ebpftracer/ebpf/include/common.h

@@ -74,7 +74,8 @@ enum code_type {
 	CodeTypeNetCoreAot = 1108,
 	CodeTypeLua        = 1009,
 	CodeTypeJavaC      = 1010,
-	CodeTypeRuby       = 1012
+	CodeTypeRuby       = 1012,
+	CodeTypeNginx      = 1019
 };
 
 #endif /* DF_BPF_COMMON_H */

+ 239 - 0
ebpftracer/ebpf/l7/apm_trace.c

@@ -16,6 +16,13 @@ struct {
 	__uint(max_entries, 32768);
 } apm_current_span_context_map SEC(".maps");
 
+struct {
+	__uint(type, BPF_MAP_TYPE_LRU_HASH);
+	__uint(key_size, sizeof(struct apm_sk_msg_key_t));
+	__uint(value_size, sizeof(struct apm_span_context));
+	__uint(max_entries, 32768);
+} apm_current_span_context_by_ipport_map SEC(".maps");
+
 struct {
 	__uint(type, BPF_MAP_TYPE_LRU_HASH);
 	__uint(key_size, sizeof(struct apm_trace_key_t));
@@ -63,6 +70,43 @@ struct {
 	__uint(max_entries, 10240);
 } pid_of_connection_ptr_maps SEC(".maps");
 
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__uint(key_size, sizeof(__u16));
+	__uint(value_size, sizeof(__u8));
+	__uint(max_entries, 64);
+} l4_header_code_types SEC(".maps");
+
+#define MAX_LEN 1022
+#define L7_IOVEC_BUF_SIZE 1024
+#define MAX_L7_IOVEC_BUF_SIZE 10240
+
+struct sock_t {
+	__u32 size;
+	__u32 header_offset_idx;
+	__u32 host_offset_idx;
+	__u32 end_str_len;
+	void * payload_char_p;
+	void * payload_len_p;
+	char payload[MAX_LEN];
+	char header_stream[CW_STREAM_HEADER_LEN];
+	char host[30];
+	struct ebpf_proc_info* proc_info;
+};
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__type(key, __u32);
+	__type(value, struct sock_t);
+	__uint(max_entries, 1);
+} socket_heap SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__type(key, int);
+	__type(value, struct apm_span_context);
+	__uint(max_entries, 1);
+} apm_span_context_heap SEC(".maps");
+
 static __inline __attribute__((__always_inline__))
 struct apm_trace_key_t get_apm_trace_key(__u64 timeout, bool is_socket_io) {
 	__u64 pid_tgid = bpf_get_current_pid_tgid();
@@ -444,6 +488,46 @@ struct apm_span_context *cw_get_current_tracking_span(struct apm_trace_info_t *t
 	return apm_sc;
 }
 
+
+static __inline __attribute__((__always_inline__))
+void cw_save_current_span_context_by_ipport(__u32 remote_ip4, __u16 remote_port, struct apm_span_context *sc) {
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	struct apm_sk_msg_key_t sk_msg_key = {};
+	sk_msg_key.tgid = (__u32) (pid_tgid >> 32);
+	sk_msg_key.remote_ip4 = remote_ip4;
+	sk_msg_key.remote_port = remote_port;
+	bpf_map_update_elem(&apm_current_span_context_by_ipport_map, &sk_msg_key, sc, BPF_ANY);
+}
+
+static __inline __attribute__((__always_inline__))
+struct apm_span_context *cw_get_current_span_context_by_ipport(__u32 remote_ip4, __u16 remote_port) {
+	struct apm_span_context *apm_sc = {0};
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	struct apm_sk_msg_key_t sk_msg_key = {};
+	sk_msg_key.tgid = (__u32) (pid_tgid >> 32);
+	sk_msg_key.remote_ip4 = remote_ip4;
+	sk_msg_key.remote_port = remote_port;
+//	bpf_printk("sk_msg_key.remote_ip4 [%llu]",sk_msg_key.remote_ip4);
+//	bpf_printk("sk_msg_key.remote_port [%llu]",sk_msg_key.remote_port);
+	struct apm_span_context *span_contexts = bpf_map_lookup_elem(&apm_current_span_context_by_ipport_map, &sk_msg_key);
+	if (span_contexts) {
+		apm_sc = span_contexts;
+//		for (int i = 0; i < APM_ASSUMED_APP_ID_SIZE; i++) {
+//			bpf_printk("cw_get_current_tracking_span-assumed_app_id[%d] = %02x", i, span_contexts->assumed_app_id[i]);
+//		}
+	}
+	return apm_sc;
+}
+
+static __inline __attribute__((__always_inline__))
+void cw_clean_current_span_context_by_ipport(__u32 remote_ip4, __u16 remote_port, __u32 tgid) {
+	struct apm_sk_msg_key_t sk_msg_key = {};
+	sk_msg_key.tgid = tgid;
+	sk_msg_key.remote_ip4 = remote_ip4;
+	sk_msg_key.remote_port = remote_port;
+	bpf_map_delete_elem(&goid_trace_info_heap, &sk_msg_key);
+}
+
 static __inline __attribute__((__always_inline__))
 __u32 cw_add_event_count(__u64 trace_id) {
 	__u32 *event_count = bpf_map_lookup_elem(&trace_event_count_heap, &trace_id);
@@ -499,3 +583,158 @@ struct apm_trace_info_t cw_save_trace_info(__u64 id, __u32 pid, __u64 fd) {
 	bpf_map_update_elem(&trace_event_count_heap, &trace_info.trace_id, &event_count, BPF_NOEXIST);
 	return trace_info;
 }
+
+/* 直接从 ip_net (u32 network order) + port_field (u32 raw msg->remote_port)
+ * 提取高16位 port(network order),并直接生成 8 字节 每字节 0..9 的 assumed_app_id_out
+ */
+static __always_inline void ipport_to_16digits_from_u32_high(__u32 ip_net, __u32 port_field, unsigned char out[16]) {
+	// 1) 提取 port 高 16 位(network-order),和 ip 的 4 字节(network-order)
+	__u16 port_net_high = (__u16)((port_field >> 16) & 0xffff);
+
+	__u8 b0 = (ip_net >> 24) & 0xFF;
+	__u8 b1 = (ip_net >> 16) & 0xFF;
+	__u8 b2 = (ip_net >> 8) & 0xFF;
+	__u8 b3 = (ip_net) & 0xFF;
+	__u8 p0 = (port_net_high >> 8) & 0xFF;
+	__u8 p1 = (port_net_high) & 0xFF;
+
+	// 2) 两轮 FNV-1a:h1 和 h2(h2 用不同种子/tweak),各自产出 8 字节,共 16 字节 raw
+	uint64_t h1 = 14695981039346656037ULL; // FNV offset basis
+#pragma clang loop unroll(full)
+	for (int i = 0; i < 6; i++) {
+		__u8 v = (i==0)?b0: (i==1)?b1: (i==2)?b2: (i==3)?b3: (i==4)?p0: p1;
+		h1 ^= (uint64_t)v;
+		h1 *= (uint64_t)1099511628211ULL;
+	}
+	uint64_t h2 = 14695981039346656037ULL ^ 0x9e3779b97f4a7c15ULL; // tweak seed
+#pragma clang loop unroll(full)
+	for (int i = 0; i < 6; i++) {
+		__u8 v = (i==0)?b0: (i==1)?b1: (i==2)?b2: (i==3)?b3: (i==4)?p0: p1;
+		h2 ^= (uint64_t)v;
+		h2 *= (uint64_t)1099511628211ULL;
+	}
+
+	__u8 raw[16];
+	// h1 -> raw[0..7] (big-endian)
+	raw[0] = (h1 >> 56) & 0xFF;
+	raw[1] = (h1 >> 48) & 0xFF;
+	raw[2] = (h1 >> 40) & 0xFF;
+	raw[3] = (h1 >> 32) & 0xFF;
+	raw[4] = (h1 >> 24) & 0xFF;
+	raw[5] = (h1 >> 16) & 0xFF;
+	raw[6] = (h1 >> 8) & 0xFF;
+	raw[7] = (h1) & 0xFF;
+	// h2 -> raw[8..15] (big-endian)
+	raw[8]  = (h2 >> 56) & 0xFF;
+	raw[9]  = (h2 >> 48) & 0xFF;
+	raw[10] = (h2 >> 40) & 0xFF;
+	raw[11] = (h2 >> 32) & 0xFF;
+	raw[12] = (h2 >> 24) & 0xFF;
+	raw[13] = (h2 >> 16) & 0xFF;
+	raw[14] = (h2 >> 8) & 0xFF;
+	raw[15] = (h2) & 0xFF;
+
+	// 3) 把每个 raw byte 映射到 0..9(用 mod10_u8 不用除法)
+#pragma clang loop unroll(full)
+	for (int i = 0; i < 16; i++) {
+		__u8 x = raw[i];
+		__u32 q = ((__u32)x * 205U) >> 11; // approximate x/10
+		__u8 r = ( __u8 )( x - ( (__u8)(q * 10U) ) ); // r == x % 10 for 0..255
+		out[i] = (unsigned char) r;
+	}
+
+	// 4) 确保第 0 字节不是 0(按你历史逻辑)
+	if (out[0] == 0) out[0] = 1;
+}
+
+
+static __inline struct apm_span_context * build_sc_by_ipport(struct ebpf_proc_info proc_info, __u32 rip_net, __u32 raw_port) {
+	__u32 key = 0;
+	struct apm_span_context *cw_sc = bpf_map_lookup_elem(&apm_span_context_heap, &key);
+
+	if (cw_sc == NULL) {
+		return NULL;
+	}
+
+	struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
+	if (cw_psc) {
+		copy_byte_arrays(cw_psc->trace_id, cw_sc->trace_id, APM_TRACE_ID_SIZE);
+	}
+	// new spanid
+	generate_random_bytes(cw_sc->span_id, APM_SPAN_ID_SIZE);
+
+	// set host_id/appid
+	/*__u32 k0 = 0;
+	struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
+	if (trace_conf) {
+//		for (int i = 0; i < 8; ++i) {
+//			cw_bpf_debug("[Client] host_id:%02x", trace_conf->host_id[i]);
+//		}
+		copy_byte_arrays(trace_conf->host_id, cw_sc->host_id, APM_HOST_ID_SIZE);
+//		copy_byte_arrays(trace_conf->app_id, cw_sc->app_id, APM_APP_ID_SIZE);
+	}*/
+
+	copy_byte_arrays(proc_info.instance_id, cw_sc->instance_id, APM_APP_ID_SIZE);
+	copy_byte_arrays(proc_info.app_id, cw_sc->app_id, APM_APP_ID_SIZE);
+
+	// set assumed_app_id
+	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_STRING_SIZE];
+	ipport_to_16digits_from_u32_high(rip_net, raw_port, assumed_app_id);
+	hex_string_to_bytes((char *) assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE, cw_sc->assumed_app_id);
+
+//	set_assumed_app_id_arrays(host, cw_sc->assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
+	return cw_sc;
+}
+
+/* 返回语言名称的函数:例如 "Go","Java","Nodejs","Nginx", ".net" ... */
+static __always_inline const char *code_type_to_name(__u16 code_type) {
+	switch (code_type) {
+		case CodeTypeGo:                  return "Go";        // 1006
+		case CodeTypeJava:
+		case CodeTypeJavaAot:
+		case CodeTypeJavaC:               return "Java";      // 1002,1102,1010 -> Java family
+		case CodeTypeNode:                return "Nodejs";    // 1005
+		case CodeTypePHP:                 return "PHP";       // 1001
+		case CodeTypePython:              return "Python";    // 1003
+		case CodeTypeDotNet:              return ".net";      // 1004
+		case CodeTypeNetCore:
+		case CodeTypeNetCoreAot:          return ".netCore";  // 1008,1108
+		case CodeTypeC:                   return "C";         // 1007
+		case CodeTypeWaitCheck:           return "WaitCheck"; // 0
+//		case CodeTypeUnknown:             return "Unknown";   // -1
+		default:
+			return "Unknown";
+	}
+}
+
+/* 返回两位数字码的函数:例如 "00" -> Go, "01" -> Java ... */
+static __always_inline  char code_type_to_code(__u16 code_type) {
+	switch (code_type) {
+		case CodeTypeGo:                  return '0'; // Go
+		case CodeTypeJava:
+		case CodeTypeJavaAot:
+		case CodeTypeJavaC:               return '1'; // Java family
+		case CodeTypeNode:                return '2'; // Nodejs
+		case CodeTypeNginx:               return '3'; // Nginx
+		case CodeTypeDotNet:              return '4'; // .net
+		case CodeTypeNetCore:
+		case CodeTypeNetCoreAot:          return '5'; // .netCore
+		case CodeTypePHP:                 return '6'; // PHP
+		case CodeTypePython:              return '7'; // Python
+		case CodeTypeC:                   return '8'; // C
+		default:
+			return '0'; // unknown / fallback
+	}
+}
+
+// 假定已有:
+// struct ebpf_proc_info { __u16 code_type; /* ... */ };
+// BPF map 已声明为 proc_info_map
+
+static __always_inline int mk_header_in_sk_msg(__u16 code_type) {
+	/* 在 allowed_code_types 中查找 */
+	__u8 *p = bpf_map_lookup_elem(&l4_header_code_types, &code_type);
+	if (p && *p)
+		return 1;
+	return 0;
+}

+ 33 - 0
ebpftracer/ebpf/l7/http.c

@@ -59,3 +59,36 @@ int is_http_response(char *buf, __u32 *status) {
     *status = (b[9]-'0')*100 + (b[10]-'0')*10 + (b[11]-'0');
     return 1;
 }
+
+static __always_inline
+int is_http_request_in_sk(const char *buf) {
+	char b[16];
+	if (bpf_probe_read_kernel(b, 16, (void *)buf) != 0) {
+		return 0;
+	}
+	if (b[0] == 'G' && b[1] == 'E' && b[2] == 'T') {
+		return 1;
+	}
+	if (b[0] == 'P' && b[1] == 'O' && b[2] == 'S' && b[3] == 'T') {
+		return 1;
+	}
+	if (b[0] == 'H' && b[1] == 'E' && b[2] == 'A' && b[3] == 'D') {
+		return 1;
+	}
+	if (b[0] == 'P' && b[1] == 'U' && b[2] == 'T') {
+		return 1;
+	}
+	if (b[0] == 'D' && b[1] == 'E' && b[2] == 'L' && b[3] == 'E' && b[4] == 'T' && b[5] == 'E') {
+		return 1;
+	}
+	if (b[0] == 'C' && b[1] == 'O' && b[2] == 'N' && b[3] == 'N' && b[4] == 'E' && b[5] == 'C' && b[6] == 'T') {
+		return 1;
+	}
+	if (b[0] == 'O' && b[1] == 'P' && b[2] == 'T' && b[3] == 'I' && b[4] == 'O' && b[5] == 'N' && b[6] == 'S') {
+		return 1;
+	}
+	if (b[0] == 'P' && b[1] == 'A' && b[2] == 'T' && b[3] == 'C' && b[4] == 'H') {
+		return 1;
+	}
+	return 0;
+}

+ 592 - 12
ebpftracer/ebpf/l7/l7.c

@@ -124,6 +124,30 @@ struct {
     __uint(value_size, sizeof(int));
 } l7_events SEC(".maps");
 
+// sockhash map for stream_verdict program
+struct {
+	__uint(type, BPF_MAP_TYPE_SOCKHASH);
+	__uint(max_entries, 65535);
+	__type(key, struct bpf_sock_tuple);
+	__type(value, __u32); // socket cookie
+} sockhash SEC(".maps");
+
+// 用于在cgroup/skb和sk_skb程序间传递信息的map
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__uint(max_entries, 65535);
+	__type(key, __u32); // socket cookie
+	__type(value, __u32); // flag: 1=需要修改HTTP请求
+} http_modify_flags SEC(".maps");
+
+// sk_msg map for sk_msg program
+struct {
+	__uint(type, BPF_MAP_TYPE_SOCKMAP);
+	__uint(max_entries, 65535);
+	__type(key, __u32); // socket cookie
+	__type(value, __u32); // placeholder
+} sk_msg_map SEC(".maps");
+
 struct read_args {
     __u64 fd;
     char* buf;
@@ -252,7 +276,9 @@ struct {
 } apm_span_context_heap3 SEC(".maps");
 
 #define TRACE_ID_SIZE 16
-static __inline __u32 has_cw_header(const char *data);
+
+
+
 static __always_inline void cw_string_to_span_context(char *str, struct apm_span_context *ctx);
 static __always_inline void generate_random_bytes(unsigned char *buff, __u32 size);
 static __inline __attribute__((__always_inline__)) void cw_save_parent_tracking_span(struct apm_span_context *sc);
@@ -701,6 +727,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 }
 
 static inline __attribute__((__always_inline__))
+//int trace_enter_read(__u64 id, __u32 pid, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
 int trace_enter_read(int _type , __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
 	__u64 id = bpf_get_current_pid_tgid();
 	__u32 pid = id >> 32;
@@ -934,30 +961,52 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     COPY_PAYLOAD(e->payload, req->payload_size, req->payload);
 
     bpf_map_delete_elem(&active_l7_requests, &k);
-//	cw_bpf_debug("delete req--------:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
 	if (e->protocol == PROTOCOL_HTTP) {
+//		bpf_printk("Response -------: k.pid:%d, k.fd:%d", k.pid, k.fd);
 //		__u64 trace_id = req->trace_id;
 		e->trace_id = req->trace_id;
-        cw_bpf_debug("l7.c addr is --------:%d,%s",conn->sport,conn->saddr);
+//      cw_bpf_debug("l7.c addr is --------:%d,%s",conn->sport,conn->saddr);
+
+//		bpf_printk("l7.c src  addr is --------:%pI4:%d",conn->saddr,conn->sport);
+//		bpf_printk("l7.c dist addr is --------:%pI4:%d",conn->daddr,conn->dport);
         e->component_sport = conn->sport;
         e->component_dport = conn->dport;
         __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
         __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
-//	    struct  apm_span_context * sc = cw_get_current_tracking_span();
-//	    if (sc) {
-		cw_copy_byte_arrays(req->assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
-		cw_copy_byte_arrays(req->span_id, e->span_id, APM_SPAN_ID_SIZE);
-//		    for (int i = 0; i < APM_ASSUMED_APP_ID_SIZE; i++) {
-//			    cw_bpf_debug("assumed_app_id-assumed_app_id[%d] = %02x", i, req->assumed_app_id[i]);
-//		    }
-//	    for (int i = 0; i < APM_SPAN_ID_SIZE; i++) {
+//		bpf_printk("l7.c src  addr is --------:%pI4:%d",&e->component_saddr,e->component_sport);
+
+
+		__be32 v4 = *(__be32 *)&e->component_daddr[12];  // 取最后4字节
+		__u32  rip  = bpf_ntohl(v4);                     // 转主机序
+//		bpf_printk("comp src v4: %u.%u.%u",
+//		           (rip >> 24) & 0xff,
+//		           (rip >> 16) & 0xff,
+//		           (rip >>  8) & 0xff
+////		           (rip >>  0) & 0xff
+//		           );
+//		bpf_printk("%u\n",(rip >>  0) & 0xff);
+		unsigned char *src_assumed;
+		unsigned char *src_span;
+		struct apm_span_context *sk_msg_sc = cw_get_current_span_context_by_ipport(rip, conn->dport);
+		if (sk_msg_sc) {
+			src_assumed = sk_msg_sc->assumed_app_id;
+			src_span    = sk_msg_sc->span_id;
+			cw_clean_current_span_context_by_ipport(rip, conn->dport, pid);
+		} else {
+			src_assumed = req->assumed_app_id;
+			src_span    = req->span_id;
+		}
+		cw_copy_byte_arrays(src_assumed, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+		cw_copy_byte_arrays(src_span,    e->span_id,         APM_SPAN_ID_SIZE);
+
+		//	    for (int i = 0; i < APM_SPAN_ID_SIZE; i++) {
 //			    cw_bpf_debug("cw_get_current_tracking_span-span_id[%d] = %02x", i, req->span_id[i]);
 //		    }
 //	    }
 		//        cw_bpf_debug("[Response][HTTP222]:thread_id:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
 //        cw_bpf_debug("[Response][HTTP222] trace_id:%llu", trace_id);
 //        // 请求报文
-//	    cw_bpf_debug("[Response][HTTP222] req-payload:%s",e->payload);
+//	    bpf_printk("[Response][HTTP222] req-payload:%s",e->payload);
 //        // 响应报文
 //        cw_bpf_debug("[Response][HTTP222] resp-payload:%s",payload);
 
@@ -1316,6 +1365,537 @@ int sys_exit_recvfrom(struct trace_event_raw_sys_exit_rw__stub* ctx) {
     __u32 pid = pid_tgid >> 32;
     return trace_exit_read(ctx, pid_tgid, pid, 0, ctx->ret);
 }
+
+//SEC("sk_msg")
+//int sk_msg_handler(struct sk_msg_md *msg)
+//{
+//	const char header[] = "X-Debug: 1234\r\n";
+//	const int hdr_len = sizeof(header) - 1; // 不含结尾 '\0'
+//
+//	// 在最开头插入 hdr_len 字节
+//	if (bpf_msg_push_data(msg, 0, hdr_len, 0) < 0) {
+//		bpf_printk("push_data failed\n");
+//		return SK_DROP;
+//	}
+//
+//	// 把新数据段线性化,确保能写
+//	if (bpf_msg_pull_data(msg, 0, hdr_len, 0) < 0) {
+//		bpf_printk("pull_data failed\n");
+//		return SK_DROP;
+//	}
+//
+//	char *data = (char *)(long)msg->data;
+//	char *data_end = (char *)(long)msg->data_end;
+//
+//	// 写入 header 字符串
+//#pragma clang loop unroll(full)
+//	for (int i = 0; i < hdr_len; i++) {
+//		if (data + i + 1 > data_end)
+//			break;
+//		*(char *)(data + i) = header[i];
+//	}
+//
+//	bpf_printk("Inserted header len=%d\n", hdr_len);
+//	return SK_PASS;
+//}
+
+// sockops程序:将socket添加到sockhash map中
+SEC("sockops")
+int sockops_cb(struct bpf_sock_ops *skops) {
+	// 添加调试信息
+//	bpf_printk("sockops: received event, op=%d, family=%d\n", skops->op, skops->family);
+
+	// 只处理TCP连接
+	if (skops->family != AF_INET) {
+		return 0;
+	}
+
+	// 只处理客户端主动建立的连接(出站连接)
+	// 这样stream_verdict就能看到客户端发送的HTTP请求
+	if (skops->op != BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB) {
+		return 0;
+	}
+	__u32 saddr = bpf_ntohl(skops->local_ip4);
+	__u32 daddr = bpf_ntohl(skops->remote_ip4);
+	__u16 sport = bpf_ntohs(skops->local_port);
+	__u16 dport = bpf_ntohs(skops->remote_port);
+//	bpf_printk("sockops op=%d src=%08x:%u\n", skops->op, saddr, sport);
+//	bpf_printk("        dst=%08x:%u\n", daddr, dport);
+
+	// 创建socket tuple
+	struct bpf_sock_tuple tuple = {};
+	tuple.ipv4.saddr = skops->local_ip4;
+	tuple.ipv4.daddr = skops->remote_ip4;
+	tuple.ipv4.sport = skops->local_port;
+	tuple.ipv4.dport = skops->remote_port;
+
+	// 将socket添加到sockhash map
+	__u32 cookie = bpf_get_socket_cookie(skops);
+	bpf_sock_hash_update(skops, &sockhash, &tuple, BPF_ANY);
+
+	// 同时将socket添加到sockmap(用于sk_msg程序)
+	bpf_sock_map_update(skops, &sk_msg_map, &cookie, BPF_ANY);
+
+//	bpf_printk("sockops: added socket to sockhash and sockmap, cookie=%u\n", cookie);
+
+	return 0;
+}
+
+// 专门处理HTTP请求的程序 - 使用cgroup/skb
+SEC("cgroup/skb")
+int http_request_handler(struct __sk_buff *skb) {
+	// 检查数据包长度
+//	if (skb->len < 4) {
+//		return 1; // 允许通过
+//	}
+//
+//	// 读取前几个字节来检查是否是HTTP请求
+//	char buf[100] = {};
+//	if (bpf_skb_load_bytes(skb, 0, buf, 100) < 0) {
+//		return 1; // 允许通过
+//	}
+//
+//
+//	// 跳过IP头部(20字节)和TCP头部(20字节),直接读取应用层数据
+//	// IP头部长度 = (buf[0] & 0x0F) * 4 = 5 * 4 = 20字节
+//	// TCP头部长度 = (buf[32] >> 4) * 4,但通常也是20字节
+//	int ip_header_len = (buf[0] & 0x0F) * 4;
+//	int tcp_header_len = 20; // 通常TCP头部是20字节
+//	int app_data_offset = ip_header_len + tcp_header_len;
+//
+//	// 检查是否有足够的应用层数据
+//	if (skb->len < app_data_offset + 4) {
+//		return 1; // 允许通过
+//	}
+//
+//	// 读取应用层数据的前几个字节
+//	char app_data[100] = {};
+//	if (bpf_skb_load_bytes(skb, app_data_offset, app_data, 100) < 0) {
+//		return 1; // 允许通过
+//	}
+//
+//	// 检查是否是HTTP请求
+//	if ((app_data[0] == 'G' && app_data[1] == 'E' && app_data[2] == 'T' && app_data[3] == ' ') ||
+//	    (app_data[0] == 'P' && app_data[1] == 'O' && app_data[2] == 'S' && app_data[3] == 'T') ||
+//	    (app_data[0] == 'P' && app_data[1] == 'U' && app_data[2] == 'T' && app_data[3] == ' ') ||
+//	    (app_data[0] == 'D' && app_data[1] == 'E' && app_data[2] == 'L' && app_data[3] == 'E') ||
+//	    (app_data[0] == 'H' && app_data[1] == 'E' && app_data[2] == 'A' && app_data[3] == 'D') ||
+//	    (app_data[0] == 'O' && app_data[1] == 'P' && app_data[2] == 'T' && app_data[3] == 'I')) {
+//
+////		bpf_printk("=== HTTP REQUEST FOUND ===\n");
+////		bpf_printk("IP header len: %d, TCP header len: %d\n", ip_header_len, tcp_header_len);
+////		bpf_printk("App data offset: %d\n", app_data_offset);
+////		bpf_printk("Content: %s\n", app_data);
+////		bpf_printk("=== END HTTP REQUEST ===\n");
+//
+//		// 标记这个socket需要修改HTTP请求
+//		// 通过http_modify_flags map传递信息给stream_verdict程序
+//		__u32 cookie = bpf_get_socket_cookie(skb);
+//		__u32 flag = 1; // 标记需要修改
+//		if (bpf_map_update_elem(&http_modify_flags, &cookie, &flag, BPF_ANY) == 0) {
+//			bpf_printk("Marked socket %u for HTTP request modification\n", cookie);
+//		}
+//	}
+//
+	return 1; // 允许数据包通过
+}
+
+// sk_msg program for handling socket messages
+SEC("sk_msg")
+int sk_msg_handler(struct sk_msg_md *msg)
+{
+	long ret;
+	if (bpf_msg_pull_data(msg, 0, 16, 0) < 0)
+		return SK_PASS;
+
+	void *p = (void *) (long) msg->data;
+	void *e = (void *) (long) msg->data_end;
+	if (!is_http_request_in_sk((void *) (__u64) msg->data)) {
+		return SK_PASS;
+	}
+
+	__u32 tgid = (__u32) (bpf_get_current_pid_tgid() >> 32);
+	struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (!proc_info) {
+		return SK_PASS;
+	}
+
+	if (!mk_header_in_sk_msg(proc_info->code_type)) {
+//		bpf_printk("not allowd");
+		return SK_PASS;
+	}
+
+	__u32 max_buf = msg->size < MAX_L7_IOVEC_BUF_SIZE ? msg->size : MAX_L7_IOVEC_BUF_SIZE;
+
+	// 拉满整条消息查Header
+	if (bpf_msg_pull_data(msg, 0, max_buf, 0) < 0){
+//		bpf_printk("HTTP request return");
+		return SK_PASS;
+	}
+//	bpf_printk("HTTP request return2");
+//	bpf_printk("HTTP request return2 :%d",  (__u64)msg->data_end - (__u64)msg->data);
+
+//	return SK_PASS;
+
+
+	__u32 k0 = 0;
+	struct sock_t *map_data = bpf_map_lookup_elem(&socket_heap, &k0);
+	if (!map_data) {
+		cw_bpf_debug("[java client] Failed to lookup socket_heap\n");
+		return SK_PASS;
+	}
+
+	if (bpf_probe_read_kernel(map_data->payload, sizeof(map_data->payload), (void *) (__u64) msg->data) != 0) {
+		return SK_PASS;
+	}
+//	bpf_printk("HTTP request: %s",map_data->payload);
+
+	// header 查询
+	long header_start_native = 0x0a0d312e312f5054LL; // 小端序下的 "TP/1.1\r\n" (0x54 0x50 0x2f 0x31 0x2e 0x31 0x0d 0x0a)
+#pragma clang loop unroll(full)
+	for (__u32 i = 0; i < 200 - 8; i++) {
+		long data = *(long long *) (map_data->payload + i);
+		if (data == header_start_native) {
+			map_data->header_offset_idx = i + 8;
+			break;
+		}
+	}
+
+	if (map_data->header_offset_idx == 0) {
+		bpf_printk("[java client] header_offset_idx error:\n");
+		return SK_PASS;
+	}
+
+//	bpf_printk("HTTP header_offset_idx: %d\n", map_data->header_offset_idx);
+
+	if (map_data->header_offset_idx > msg->size){
+		return SK_PASS;
+	}
+
+	// 先拉直 [0, offset) 让插入点可达
+	if (bpf_msg_pull_data(msg, 0, map_data->header_offset_idx, 0) < 0){
+		bpf_printk("HTTP request return");
+		return SK_PASS;
+	}
+
+	// TODO real Header
+//	const char insert[] = "AA: a\r\n";   // 要插入的 7 字节
+//	const char insert[CW_STREAM_HEADER_LEN] = "cwtrace: 00:00:1015481350055581:5450531005555981:5610250100539899:304775019cd3218a304775019cd3218a:1001025098564810:140acc88cde8773f\r\n";
+
+//	int ins_len = sizeof(insert) - 1;  // 不含 '\0'
+
+
+//	__u32 rip_net = msg->remote_ip4;           // network order
+//	__u32 raw_port = msg->remote_port;         // 32-bit field
+//	unsigned char assumed_app_id[16];
+//	ipport_to_8digits_from_u32_high(rip_net, raw_port, assumed_app_id);
+
+//	for (int i = 0; i < APM_ASSUMED_APP_ID_SIZE; i++) {
+//	    bpf_printk("assumed_app_id-assumed_app_id[%d] = %02x", i, assumed_app_id[i]);
+//	}
+
+	struct apm_span_context *cw_sc = build_sc_by_ipport(*proc_info, msg->remote_ip4, msg->remote_port);
+	if (cw_sc == NULL) {
+		return SK_PASS;
+	}
+
+
+	span_context_to_cw_string_stream(cw_sc, map_data->header_stream, code_type_to_code(proc_info->code_type));
+//	for (int i = 0; i < APM_ASSUMED_APP_ID_SIZE; i++) {
+//		bpf_printk("cw-assumed_app_id[%d] = %02x", i, cw_sc->assumed_app_id[i]);
+//	}
+
+//	bpf_printk("[client] header:[%s],sizeof %d ", map_data->header_stream, sizeof(map_data->header_stream));
+	__u32 tid =  (__u32)bpf_get_current_pid_tgid();
+//	bpf_printk("tid-set %llu",tid);
+
+//	__u32 raw = msg->remote_port;
+//	bpf_printk("raw_port=0x%x low_net=0x%x high_net=0x%x\n",raw, (raw & 0xffff), ((raw >> 16) & 0xffff));
+	__u16 port_host = ( __u16 )( __builtin_bswap32(msg->remote_port) & 0xffff );
+
+	__u32 rip = __builtin_bswap32(msg->remote_ip4); // host order
+	__u16 rport = __builtin_bswap32(msg->remote_port);
+//	__u8 b0 = (rip >> 24) & 0xff;
+//	__u8 b1 = (rip >> 16) & 0xff;
+//	__u8 b2 = (rip >> 8) & 0xff;
+//	__u8 b3 = rip & 0xff;
+//	bpf_printk("remote %u.%u.%u\n", b0, b1, b2);
+//	bpf_printk("remote port %u:%u\n", b3, rport);
+//	bpf_printk("remote port %u\n", rport);
+
+
+	__u32 ins_len = sizeof(map_data->header_stream) - 1;
+	__u32 header_offset = map_data->header_offset_idx;
+
+	if (header_offset > msg->size){
+		return SK_PASS;
+	}
+
+	// 2) 执行插入
+	ret = bpf_msg_push_data(msg, header_offset, ins_len, 0);
+	if (ret) {
+		bpf_printk("push off=%u len=%u ret=%ld\n", header_offset, ins_len, ret);
+		return SK_PASS;
+	}
+
+//	bpf_printk("bpf_msg_push_data suc.");
+
+	// push_data 后;拉到写满Header处的长度
+//	__u32 need = msg->size < MAX_L7_IOVEC_BUF_SIZE ? msg->size : MAX_L7_IOVEC_BUF_SIZE;
+
+	ret = bpf_msg_pull_data(msg, 0, header_offset + ins_len, 0);
+	if (ret) {
+		bpf_printk("pull 0..%u fail ret=%ld\n", header_offset + ins_len, ret);
+		return SK_PASS;
+	}
+//	bpf_printk("bpf_msg_pull_data suc.");
+
+	// 3. 写入插入的内容
+	char *data     = (char *)(long)msg->data;
+	char *data_end = (char *)(long)msg->data_end;
+	__u64 len =(__u32)(data_end - data);
+
+//	__u32 header_off = offset;
+
+	// 运行时长度 n:先限制到常量 MAX_INS,再裁剪到剩余空间
+//	__u32 n = ins_len;                  // 运行时的插入/覆盖长度
+
+	if (header_offset > len)
+		return SK_PASS;
+
+#pragma clang loop unroll(full)
+	for (int i = 0; i < MAX_L7_IOVEC_BUF_SIZE; i++) {
+		/*test1*/
+		if (i >= header_offset) {
+			for (int j = 0; j < ins_len; j++) {
+				if (data + i + 1 > data_end) break;
+				*(char *) (data + i) = map_data->header_stream[j];
+//				bpf_printk("set-%d %d", i, j);
+				i++;
+			}
+			break;
+		}
+		if (i > header_offset) break;
+		/*test1 end*/
+
+
+		/*test2
+		 * if (i < header_offset){
+			bpf_printk("continue-%d",i);
+			continue;
+		}
+		if (data + i + 1 > data_end) break;
+		if (i == header_offset + ins_len){
+			bpf_printk("break-%d",i);
+			break;
+		}
+		for (int j = 0; j < ins_len; j++) {
+			if (data + i + 1 > data_end) break;
+			*(char *)(data + i) = insert[j];
+			bpf_printk("set-%d %d", i, j);
+			i++;
+		}
+		break;*/
+
+	}
+
+	// save span context
+	cw_save_current_span_context_by_ipport(rip, rport, cw_sc);
+
+//	char vb[200] = {};
+//	if (bpf_probe_read_kernel(vb, 200, (void *)data) == 0) {
+//		bpf_printk("Modified HTTP request: %s\n", vb);
+//		bpf_printk("Modified HTTP request: %d\n", (long) msg->data_end - (long) msg->data);
+//	}
+	return SK_PASS;
+
+//	bpf_printk("sk_msg");
+//	const char header[] = "X-Debug: 1234\r\n";
+//	const int hdr_len = sizeof(header) - 1; // 不含结尾 '\0'
+	
+	// 读取HTTP请求数据
+//	__u64 data_start = (__u64)msg->data;
+//	__u64 data_end = (__u64)msg->data_end;
+//	__u64 data_len = (__u64)msg->data_end - (__u64)msg->data;
+
+	// 查找第一行结束位置(\r\n)
+//	GET /aaaaaa HTTP/1.1
+//	int first_line_end = 22;
+//
+//
+//	if (first_line_end == -1) {
+//		bpf_printk("No first line found\n");
+//		return SK_PASS;
+//	}
+//
+////	bpf_printk("First line ends at offset: %d\n", first_line_end);
+//	// 先测试基本的读取功能
+//	char test_buf[200] = {};
+//	if (bpf_probe_read_kernel(test_buf, 200, (void *)(__u64)msg->data) == 0) {
+//		bpf_printk("HTTP request: %s\n", test_buf);
+//	}
+//	if (test_buf[0] != 'G' && test_buf[1] != 'E' && test_buf[1] != 'T') {
+//		return SK_PASS;
+//	}
+//	bpf_printk("data_len %d",msg->size);
+//
+//	// 尝试插入自定义头部
+//	bpf_printk("Attempting to insert header at offset %d\n", first_line_end);
+//	/*test*/
+////	const char insert[] = "AA: a\r\n";   // 要插入的 7 字节
+////	int ins_len = sizeof(insert) - 1;  // 不含 '\0'
+////	int offset = 22;  // 偏移位置,例:在 "GET /" 后面插入
+//
+//	// 1) 先拉直 [0, offset) 让插入点可达(注意第三参是长度)
+//	 ret = bpf_msg_pull_data(msg, 0, offset, 0);
+//	if (ret) {
+//		bpf_printk("pull pre off=%u len=%u ret=%ld\n", offset, offset, ret);
+//		return SK_PASS;
+//	}
+//
+//	// 2) 执行插入
+//	ret = bpf_msg_push_data(msg, offset, ins_len, 0);
+//	if (ret) {
+//		bpf_printk("push off=%u len=%u ret=%ld\n", offset, ins_len, ret);
+//		return SK_PASS;
+//	}
+//
+//	// 插入后
+////	__u32 need = msg->size;          // 拉满整条消息
+//// 可选:上限保护,避免一次拉太大导致失败
+////	if (need > 4096) need = 4096;    // 例如先最多拉 4KB
+//
+////	ret = bpf_msg_pull_data(msg, 0, need, 0);
+////	if (ret) {
+////		bpf_printk("pull 0..%u fail ret=%ld\n", need, ret);
+////		return SK_PASS;
+////	}
+//
+//	// 3. 写入插入的内容
+////	char *data     = (char *)(long)msg->data;
+////	char *data_end = (char *)(long)msg->data_end;
+//
+//#pragma clang loop unroll(full)
+//	for (int i = 0; i < ins_len; i++) {
+//		if (data + offset + i + 1 > data_end) break;
+//		*(char *)(data + offset + i) = insert[i];
+//	}
+//
+////	bpf_printk("push_data succeeded\n");
+//
+//	/*test2*/
+//	// 参数:msg, start_offset, end_offset, flags
+////	if (bpf_msg_pull_data(msg, 0, 6, 0) < 0) {
+////		bpf_printk("pull_data failed\n");
+////		return SK_PASS;
+////	}
+////
+////	bpf_printk("pull_data succeeded, writing header\n");
+////
+////	// 测试直接修改数据
+////	char *data_ptr = (char *)(long)msg->data;
+////	char *data_end_ptr = (char *)(long)msg->data_end;
+////
+////	// 方法1:直接修改指针指向的数据
+////	if (data_ptr + 1 <= data_end_ptr) {
+//////		*data_ptr = 'E';
+////	}
+////	int offset = 5; // 第6个字符,0-based
+////	if (data_ptr + offset + 1 <= data_end_ptr) {
+////		*(data_ptr + offset) = 'b';
+////	}
+//	/*test2*/
+//
+////	data_ptr[6] = 'b';
+////	bpf_printk("Modified data[6] to 'b' using direct pointer\n");
+//
+//
+//	// 验证修改结果
+//	char verify_buf[200] = {};
+//	if (bpf_probe_read_kernel(verify_buf, 200, (void *)data) == 0) {
+//		bpf_printk("Modified HTTP request: %s\n", verify_buf);
+//		bpf_printk("Modified HTTP request: %d\n", (long) msg->data_end - (long) msg->data);
+//	}
+//
+//	__u64 id = bpf_get_current_pid_tgid();
+//	struct read_args *args = bpf_map_lookup_elem(&active_reads, &id);
+//	if (!args) {
+//		return SK_PASS;
+//	}
+//
+//	struct l7_request_key k = {};
+//	k.pid = id >> 32;
+//	k.fd = args->fd;
+//
+//	bpf_printk("Response ------- sk msg: k.pid:%d, k.fd:%d", k.pid, k.fd);
+//
+//
+//	return SK_PASS;
+}
+
+
+SEC("sk_skb/stream_verdict")
+int http_hdr_inject(struct __sk_buff *skb) {
+	// 检查数据包长度
+	if (skb->len < 4) {
+		return SK_PASS;
+	}
+	char buf[100] = {};
+	if (bpf_skb_load_bytes(skb, 0, buf, 100) < 0) {
+		return SK_PASS;
+	}
+//	bpf_printk("stream_verdict: ----%s",buf);
+
+	return SK_PASS;
+	// 获取socket cookie
+	__u32 cookie = bpf_get_socket_cookie(skb);
+	
+	// 检查这个socket是否被标记为需要修改HTTP请求
+	__u32 *flag = bpf_map_lookup_elem(&http_modify_flags, &cookie);
+	if (flag){
+		bpf_printk("%d",*flag);
+	}
+	if (flag && *flag == 1) {
+		// 读取HTTP请求数据
+		char buf[100] = {};
+		if (bpf_skb_load_bytes(skb, 0, buf, 100) < 0) {
+			return SK_PASS;
+		}
+//		bpf_printk("%s",buf);
+
+		
+		// 检查是否是HTTP请求
+		if ((buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T' && buf[3] == ' ') ||
+		    (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') ||
+		    (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T' && buf[3] == ' ') ||
+		    (buf[0] == 'D' && buf[1] == 'E' && buf[2] == 'L' && buf[3] == 'E') ||
+		    (buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') ||
+		    (buf[0] == 'O' && buf[1] == 'P' && buf[2] == 'T' && buf[3] == 'I')) {
+			
+//			bpf_printk("=== MODIFYING HTTP REQUEST ===\n");
+//			bpf_printk("Socket cookie: %u\n", cookie);
+//			bpf_printk("Original content: %s\n", buf);
+			
+			// 修改HTTP请求头
+			if (skb->len > 20) {
+				// 在HTTP头部后添加自定义头部
+				char custom_header[] = "X-Custom-Header: Modified\r\n";
+				if (bpf_skb_adjust_room(skb, sizeof(custom_header) - 1, BPF_ADJ_ROOM_NET, 0) == 0) {
+					if (bpf_skb_store_bytes(skb, 20, custom_header, sizeof(custom_header) - 1, 0) == 0) {
+						bpf_printk("Successfully added custom header to HTTP request\n");
+					}
+				}
+			}
+			
+			// 清除标记
+			__u32 clear_flag = 0;
+			bpf_map_update_elem(&http_modify_flags, &cookie, &clear_flag, BPF_ANY);
+			bpf_printk("=== END MODIFYING HTTP REQUEST ===\n");
+		}
+	}
+
+	return SK_PASS;
+}
 //
 //SEC("tracepoint/syscalls/sys_exit_recvfrom")
 //int sys_exit_recvfrom222(struct trace_event_raw_sys_exit_rw__stub* ctx) {

+ 0 - 22
ebpftracer/ebpf/utrace/java/include/java_common.h

@@ -46,28 +46,6 @@
 #define HEADER_LEN 134
 #define JAVA_MAX_BUFFER_SIZE 65536
 
-
-
-struct sock_t {
-	u32 size;
-	u32 header_offset_idx;
-	u32 host_offset_idx;
-	u32 end_str_len;
-	void * payload_char_p;
-	void * payload_len_p;
-	char payload[MAX_LEN];
-	char header_stream[CW_STREAM_HEADER_LEN];
-	char host[30];
-	struct ebpf_proc_info* proc_info;
-};
-
-struct {
-	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-	__type(key, u32);
-	__type(value, struct sock_t);
-	__uint(max_entries, 1);
-} socket_heap SEC(".maps");
-
 struct {
 	__uint(type, BPF_MAP_TYPE_ARRAY);
 	__type(key, u32); // 键类型为int

+ 9 - 8
ebpftracer/ebpf/utrace/java/net/server.probe.bpf.c

@@ -22,12 +22,12 @@
 //	__uint(max_entries, 1);
 //} socket_heap SEC(".maps");
 
-struct {
-	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-	__type(key, int);
-	__type(value, struct apm_span_context);
-	__uint(max_entries, 1);
-} apm_span_context_heap SEC(".maps");
+//struct {
+//	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+//	__type(key, int);
+//	__type(value, struct apm_span_context);
+//	__uint(max_entries, 1);
+//} apm_span_context_heap SEC(".maps");
 
  struct {
 	 __uint(type, BPF_MAP_TYPE_PROG_ARRAY);
@@ -85,11 +85,12 @@ static __inline u32 get_http_header_offset(const char *data, u32 offset, u32 len
 static __inline u32 has_cw_header(const char *data)
 {
 	// cw header 查询
-	long cw_header_native = 0x6361727477630a0dLL; // 小端序下的 "\r\ncwtrac" {0x0d,0x0a,0x63,0x77,0x74,0x72,0x61,0x63}
+	long cw_header_native_lo = 0x6361727477630a0dLL; // 小端序下的 "\r\ncwtrac" {0x0d,0x0a,0x63,0x77,0x74,0x72,0x61,0x63}
+	long cw_header_native_up = 0x6361727477430a0dLL; // 小端序下的 "\r\nCwtrac" {0x0d,0x0a,0x43,0x77,0x74,0x72,0x61,0x63}
 #pragma clang loop unroll(full)
 	for (u32 i = 0; i < 500 - 8; i++) {
 		long tmp_data = *(long long *) (data + i);
-		if (tmp_data == cw_header_native) {
+		if (tmp_data == cw_header_native_lo || tmp_data == cw_header_native_up) {
 			return i+11;
 		}
 	}

+ 205 - 4
ebpftracer/tracer.go

@@ -7,13 +7,15 @@ import (
 	"encoding/hex"
 	"errors"
 	"fmt"
-	"github.com/coroot/coroot-node-agent/utils"
-	"github.com/coroot/coroot-node-agent/utils/try"
 	"os"
+	"path/filepath"
 	"strconv"
 	"strings"
 	"time"
 
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/try"
+
 	"github.com/cilium/ebpf"
 	"github.com/cilium/ebpf/link"
 	"github.com/cilium/ebpf/perf"
@@ -376,7 +378,7 @@ func (t *Tracer) LinkEbpfProg() error {
 	)
 	for _, programSpec := range t.collectionSpec.Programs {
 		program := t.collection.Programs[programSpec.Name]
-		klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
+		klog.Infof("%s:[%s][%d]", programSpec.SectionName, programSpec.Name, program.Type())
 		if t.DisableL7Tracing() {
 			switch programSpec.Name {
 			case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
@@ -388,6 +390,205 @@ func (t *Tracer) LinkEbpfProg() error {
 			}
 		}
 		switch programSpec.Type {
+		case ebpf.SkSKB:
+			klog.Infof("Processing SkSKB program: %s", programSpec.SectionName)
+			// 对于sk_skb程序,我们需要先检查是否有sockhash map
+			sockhashMap, exists := t.collection.Maps["sockhash"]
+			if !exists {
+				klog.Warnf("sockhash map not found, skipping sk_skb program attachment")
+				continue
+			}
+
+			// 处理stream_verdict程序
+			if programSpec.SectionName == "sk_skb/stream_verdict" {
+				streamVerdictProg, exists := t.collection.Programs["http_hdr_inject"]
+				if !exists {
+					klog.Errorf("http_hdr_inject program not found")
+					continue
+				}
+				klog.Infof("Found sockhash map (FD: %d) and stream_verdict program", sockhashMap.FD())
+
+				// 尝试使用不同的附加方式
+				// 首先尝试使用RawAttachProgram
+				err = link.RawAttachProgram(link.RawAttachProgramOptions{
+					Target:  int(sockhashMap.FD()),
+					Program: streamVerdictProg,
+					Attach:  ebpf.AttachSkSKBStreamVerdict,
+				})
+				if err != nil {
+					klog.Errorf("Failed to attach stream_verdict program using RawAttachProgram: %v", err)
+					// 尝试使用AttachRawLink作为备选方案
+					l, err = link.AttachRawLink(link.RawLinkOptions{
+						Target:  int(sockhashMap.FD()),
+						Program: streamVerdictProg,
+						Attach:  ebpf.AttachSkSKBStreamVerdict,
+					})
+					if err != nil {
+						klog.Errorf("Failed to attach stream_verdict program using AttachRawLink: %v", err)
+						continue
+					}
+				}
+				klog.Infof("Successfully attached stream_verdict program to sockhash map")
+			}
+
+			// 处理stream_parser程序
+			if programSpec.SectionName == "sk_skb/stream_parser" {
+				streamParserProg, exists := t.collection.Programs["http_request_parser"]
+				if !exists {
+					klog.Errorf("http_request_parser program not found")
+					continue
+				}
+				klog.Infof("Found sockhash map (FD: %d) and stream_parser program", sockhashMap.FD())
+
+				// 尝试使用不同的附加方式
+				// 首先尝试使用RawAttachProgram
+				err = link.RawAttachProgram(link.RawAttachProgramOptions{
+					Target:  int(sockhashMap.FD()),
+					Program: streamParserProg,
+					Attach:  ebpf.AttachSkSKBStreamParser,
+				})
+				if err != nil {
+					klog.Errorf("Failed to attach stream_parser program using RawAttachProgram: %v", err)
+					// 尝试使用AttachRawLink作为备选方案
+					l, err = link.AttachRawLink(link.RawLinkOptions{
+						Target:  int(sockhashMap.FD()),
+						Program: streamParserProg,
+						Attach:  ebpf.AttachSkSKBStreamParser,
+					})
+					if err != nil {
+						klog.Errorf("Failed to attach stream_parser program using AttachRawLink: %v", err)
+						continue
+					}
+				}
+				klog.Infof("Successfully attached stream_parser program to sockhash map")
+			}
+
+		case ebpf.SockOps:
+			klog.Infof("Processing SockOps program: %s", programSpec.SectionName)
+			// 获取sockops程序
+			sockopsProg, exists := t.collection.Programs["sockops_cb"]
+			if !exists {
+				klog.Errorf("sockops_cb program not found")
+				continue
+			}
+
+			// 创建cgroup路径
+			cgroupPath := "/sys/fs/cgroup/ebpf-sockops"
+			if err := os.MkdirAll(cgroupPath, 0755); err != nil {
+				klog.Errorf("Failed to create cgroup path: %v", err)
+				continue
+			}
+
+			// 从环境变量获取要监控的PID
+			filterPidStr := os.Getenv("FILTER_PID")
+			if filterPidStr == "" {
+				klog.Warnf("FILTER_PID environment variable not set, using current process")
+				filterPidStr = fmt.Sprint(os.Getpid())
+			}
+
+			// 将指定PID添加到cgroup
+			if err := os.WriteFile(filepath.Join(cgroupPath, "cgroup.procs"), []byte(filterPidStr), 0644); err != nil {
+				klog.Errorf("Failed to add process %s to cgroup: %v", filterPidStr, err)
+				continue
+			}
+			klog.Infof("Added process %s to cgroup for monitoring", filterPidStr)
+
+			// 附加sockops程序到cgroup
+			l, err = link.AttachCgroup(link.CgroupOptions{
+				Path:    cgroupPath,
+				Program: sockopsProg,
+				Attach:  ebpf.AttachCGroupSockOps,
+			})
+			if err != nil {
+				klog.Errorf("Failed to attach sockops program: %v", err)
+				continue
+			}
+			klog.Infof("Successfully attached sockops program to cgroup: %s", cgroupPath)
+
+		case ebpf.CGroupSKB:
+			klog.Infof("Processing CGroupSKB program: %s", programSpec.SectionName)
+			// 处理cgroup/skb程序
+			if programSpec.SectionName == "cgroup/skb" {
+				cgroupSkbProg, exists := t.collection.Programs["http_request_handler"]
+				if !exists {
+					klog.Errorf("http_request_handler program not found")
+					continue
+				}
+
+				// 创建cgroup路径
+				cgroupPath := "/sys/fs/cgroup/ebpf-sockops"
+				if err := os.MkdirAll(cgroupPath, 0755); err != nil {
+					klog.Errorf("Failed to create cgroup path: %v", err)
+					continue
+				}
+
+				// 从环境变量获取要监控的PID
+				filterPidStr := os.Getenv("FILTER_PID")
+				if filterPidStr == "" {
+					klog.Warnf("FILTER_PID environment variable not set, using current process")
+					filterPidStr = fmt.Sprint(os.Getpid())
+				}
+
+				// 将指定PID添加到cgroup
+				if err := os.WriteFile(filepath.Join(cgroupPath, "cgroup.procs"), []byte(filterPidStr), 0644); err != nil {
+					klog.Errorf("Failed to add process %s to cgroup: %v", filterPidStr, err)
+					continue
+				}
+				klog.Infof("Added process %s to cgroup for HTTP request monitoring", filterPidStr)
+
+				// 附加cgroup/skb程序到cgroup
+				l, err = link.AttachCgroup(link.CgroupOptions{
+					Path:    cgroupPath,
+					Program: cgroupSkbProg,
+					Attach:  ebpf.AttachCGroupInetEgress,
+				})
+				if err != nil {
+					klog.Errorf("Failed to attach cgroup/skb program: %v", err)
+					continue
+				}
+				klog.Infof("Successfully attached cgroup/skb program to cgroup: %s", cgroupPath)
+			}
+
+		case ebpf.SkMsg:
+			klog.Infof("Processing SkMsg program: %s", programSpec.SectionName)
+			// 处理sk_msg程序
+			if programSpec.SectionName == "sk_msg" {
+				skMsgProg, exists := t.collection.Programs["sk_msg_handler"]
+				if !exists {
+					klog.Errorf("sk_msg_handler program not found")
+					continue
+				}
+
+				skMsgMap, exists := t.collection.Maps["sk_msg_map"]
+				if !exists {
+					klog.Errorf("sk_msg_map not found")
+					continue
+				}
+
+				klog.Infof("Found sk_msg_map (FD: %d) and sk_msg program", skMsgMap.FD())
+
+				// 附加sk_msg程序到sockmap
+				err = link.RawAttachProgram(link.RawAttachProgramOptions{
+					Target:  int(skMsgMap.FD()),
+					Program: skMsgProg,
+					Attach:  ebpf.AttachSkMsgVerdict,
+				})
+				if err != nil {
+					klog.Errorf("Failed to attach sk_msg program using RawAttachProgram: %v", err)
+					// 尝试使用AttachRawLink作为备选方案
+					l, err = link.AttachRawLink(link.RawLinkOptions{
+						Target:  int(skMsgMap.FD()),
+						Program: skMsgProg,
+						Attach:  ebpf.AttachSkMsgVerdict,
+					})
+					if err != nil {
+						klog.Errorf("Failed to attach sk_msg program using AttachRawLink: %v", err)
+						continue
+					}
+				}
+				klog.Infof("Successfully attached sk_msg program to sockmap")
+			}
+
 		case ebpf.TracePoint:
 			if strings.Contains(programSpec.SectionName, "prog") {
 				continue
@@ -550,7 +751,7 @@ type l7Event struct {
 	InstanceIdFrom      HashByte
 	AppIdFrom           HashByte
 	SpanIdFrom          HashByte
-	TypeFrom          	[1]byte
+	TypeFrom            [1]byte
 }
 
 type SocketDataBufferddd struct {

+ 20 - 7
ebpftracer/tracer/btf_vmlinux.go

@@ -84,15 +84,28 @@ func bpf_table_pre_set_value(collectionSpec *ebpf.CollectionSpec, opts *ebpf.Col
 }
 
 func bpf_table_set_value(collection *ebpf.Collection, mapName string, key uint32, data any) (int, error) {
-	m, ok := collection.Maps[mapName]
-	//for s, m2 := range collection.Maps {
-	//	fmt.Println(s, m2.String())
+	//m, ok := collection.Maps[mapName]
+	////for s, m2 := range collection.Maps {
+	////	fmt.Println(s, m2.String())
+	////}
+	////fmt.Println("bpf_table_set_value", m, mapName, data)
+	//if ok {
+	//	k := make([]byte, 4)                  // Assuming int k size is 4 bytes
+	//	binary.LittleEndian.PutUint32(k, key) // Assuming the key is an integer
+	//	if err := m.Update(k, data, ebpf.UpdateAny); err != nil {
+	//		return ETR_UPDATE_MAP_FAILD, err
+	//	}
+	//} else {
+	//	return ETR_UPDATE_MAP_FAILD, errors.New("cannot find map " + mapName)
 	//}
-	//fmt.Println("bpf_table_set_value", m, mapName, data)
+	//return ETR_OK, nil
+	return bpf_table_set_any_value(collection, mapName, key, data)
+}
+
+func bpf_table_set_any_value(collection *ebpf.Collection, mapName string, key, data any) (int, error) {
+	m, ok := collection.Maps[mapName]
 	if ok {
-		k := make([]byte, 4)                  // Assuming int k size is 4 bytes
-		binary.LittleEndian.PutUint32(k, key) // Assuming the key is an integer
-		if err := m.Update(k, data, ebpf.UpdateAny); err != nil {
+		if err := m.Update(key, data, ebpf.UpdateAny); err != nil {
 			return ETR_UPDATE_MAP_FAILD, err
 		}
 	} else {

+ 66 - 6
ebpftracer/tracer/socket.go

@@ -2,16 +2,19 @@ package tracer
 
 import (
 	"fmt"
-	"github.com/cilium/ebpf"
-	"github.com/cilium/ebpf/btf"
-	"github.com/coroot/coroot-node-agent/utils"
-	. "github.com/coroot/coroot-node-agent/utils/modelse"
-	klog "github.com/sirupsen/logrus"
 	"net"
 	"os"
 	"runtime"
+	"strings"
 	"syscall"
 	"time"
+
+	"github.com/cilium/ebpf"
+	"github.com/cilium/ebpf/btf"
+	"github.com/coroot/coroot-node-agent/flags"
+	"github.com/coroot/coroot-node-agent/utils"
+	. "github.com/coroot/coroot-node-agent/utils/modelse"
+	klog "github.com/sirupsen/logrus"
 )
 
 func init() {
@@ -52,6 +55,9 @@ func MapInsert(collection *ebpf.Collection) {
 
 	// Update protocol filter array
 	update_protocol_filter_array(collection)
+
+	// 从flags配置获取CodeType配置,默认支持Python和Node
+	update_skmsg_header_allowed_to_map(collection)
 }
 
 func insert_output_prog_to_map(collection *ebpf.Collection) {
@@ -109,7 +115,7 @@ func insert_adapt_kern_uid_to_map(collection *ebpf.Collection) {
 	pid := os.Getpid()
 	tid := syscall.Gettid()
 	adaptKernUID := uint64(pid)<<32 | uint64(tid)
-	code, err := bpf_table_set_value(collection, MAP_ADAPT_KERN_UID_NAME, 0, adaptKernUID)
+	code, err := bpf_table_set_value(collection, MAP_ADAPT_KERN_UID_NAME, uint32(0), adaptKernUID)
 	if err != nil || code != ETR_OK {
 		klog.Error(err, code)
 	}
@@ -362,3 +368,57 @@ func SetConstants(collectionSpec *ebpf.CollectionSpec) {
 	// 	fmt.Println("err", err, consts)
 	// }
 }
+
+// parseCodeTypesFromFlags 从flags配置解析CodeType数组
+// 配置格式: "python,nodejs" 或 "python,node" 等
+// 支持的语言标识: python, node, nodejs, java, go, php, dotnet, netcore, c, lua, ruby
+func parseCodeTypesFromFlags() []CodeType {
+	var codeTypes []CodeType
+	configValue := *flags.L4HeaderCodeTypes
+	if configValue == "" {
+		return codeTypes
+	}
+	parts := strings.Split(configValue, ",")
+	for _, part := range parts {
+		part = strings.TrimSpace(strings.ToLower(part))
+		switch part {
+		case "python":
+			codeTypes = append(codeTypes, CodeTypePython)
+		case "node", "nodejs":
+			codeTypes = append(codeTypes, CodeTypeNode)
+		case "java":
+			codeTypes = append(codeTypes, CodeTypeJava)
+		case "go":
+			codeTypes = append(codeTypes, CodeTypeGo)
+		case "php":
+			codeTypes = append(codeTypes, CodeTypePHP)
+		case "dotnet":
+			codeTypes = append(codeTypes, CodeTypeDotNet)
+		case "netcore":
+			codeTypes = append(codeTypes, CodeTypeNetCore)
+		case "c":
+			codeTypes = append(codeTypes, CodeTypeC)
+		case "lua":
+			codeTypes = append(codeTypes, CodeTypeLua)
+		case "ruby":
+			codeTypes = append(codeTypes, CodeTypeRuby)
+		case "nginx":
+			codeTypes = append(codeTypes, CodeTypeNginx)
+		default:
+			klog.Warnf("Unknown code type in L4_HEADER_CODE_TYPES: %s", part)
+		}
+	}
+
+	return codeTypes
+}
+
+func update_skmsg_header_allowed_to_map(collection *ebpf.Collection) {
+	codeTypes := parseCodeTypesFromFlags()
+	klog.Infof("[kernel] set l4 header allowed_to_map %s", codeTypes)
+	for _, codeType := range codeTypes {
+		_, err := bpf_table_set_any_value(collection, MAP_L4_HEADER_CODE_TYPES_NAME, uint16(codeType), uint8(1))
+		if err != nil {
+			klog.Errorf("[kernel] update_skmsg_header_allowed_to_map err: %v]")
+		}
+	}
+}

+ 1 - 0
flags/flags.go

@@ -66,6 +66,7 @@ var (
 
 	HostDirPathPrefix = kingpin.Flag("host-dir-path-prefix", "Set the prefix of path about the mount point of the host directory").Envar("HOST_DIR_PATH_PREFIX").Default("").String()
 	FuseTryMax        = kingpin.Flag("fuse_try_max", "The maximum number of the fuse operation try").Default("3").Envar("FUSE_TRY_MAX").Int()
+	L4HeaderCodeTypes = kingpin.Flag("l4-header", "L4 header code types (e.g., python,node,ruby)").Envar("L4_HEADER_CODE_TYPES").Default("python,node").String()
 	// debug
 	Test = kingpin.Flag("test", "Only test").Default("false").Envar("TEST").Bool()
 )

+ 6 - 1
utils/modelse/bpf_struct.go

@@ -25,7 +25,8 @@ const (
 	MAP_PROGS_JMP_TP_NAME = "__progs_jmp_tp_map"
 	MAP_PROGS_JMP_UP_NAME = "__progs_jmp_up_map"
 
-	MAP_PROC_INFO_MAP_NAME = "proc_info_map"
+	MAP_PROC_INFO_MAP_NAME        = "proc_info_map"
+	MAP_L4_HEADER_CODE_TYPES_NAME = "l4_header_code_types"
 	// This prog is designed to handle data transfer
 	PROGUP                         = "bpf_prog_up__"
 	PROGKP                         = "bpf_prog_kp__"
@@ -201,6 +202,10 @@ type EbpfProcInfo struct {
 	BucketsPtrPos  uint64
 }
 
+type allowedHeaderCodeTypes struct {
+	CodeType uint16
+}
+
 type allowPortBitmap struct {
 	Bitmap [65536 / 8]uint8
 }

+ 14 - 0
utils/modelse/code_type.go

@@ -22,6 +22,7 @@ const (
 	CodeTypeLua        CodeType = 1009
 	CodeTypeJavaC      CodeType = 1010
 	CodeTypeRuby       CodeType = 1012
+	CodeTypeNginx      CodeType = 1019
 )
 
 func (p CodeType) String() string {
@@ -52,6 +53,8 @@ func (p CodeType) String() string {
 		return "JAVA_C"
 	case CodeTypeRuby:
 		return "RUBY"
+	case CodeTypeNginx:
+		return "NGINX"
 	case CodeTypeWaitCheck:
 		return "WAIT_CHECK"
 	case CodeTypeUnknown:
@@ -130,6 +133,8 @@ func (p CodeType) ServiceTypeString() string {
 		return "JAVA_C"
 	case CodeTypeRuby:
 		return "RUBY"
+	case CodeTypeNginx:
+		return "NGINX"
 	case CodeTypeWaitCheck:
 		return "WAIT_CHECK"
 	case CodeTypeUnknown:
@@ -159,6 +164,8 @@ func (p CodeType) WhiteCodeString() string {
 		return "c"
 	case CodeTypeNetCore:
 		return "dc"
+	case CodeTypeNginx:
+		return "nginx"
 	//case CodeTypeNetCoreAot:
 	//	return "NETCORE_AOT"
 	//case CodeTypeLua:
@@ -273,3 +280,10 @@ func (p CodeType) IsJavaCCode() bool {
 	}
 	return false
 }
+
+func (p CodeType) IsNginxCode() bool {
+	if p == CodeTypeNginx {
+		return true
+	}
+	return false
+}