Ver código fonte

Fixed #TASK_QT-9810 完成accept与l7层数据的关联。

rock 1 ano atrás
pai
commit
439a0bf99e

+ 22 - 2
ebpftracer/ebpf/l7/l7.c

@@ -79,6 +79,10 @@ struct l7_event {
     __u32 trace_start;
     __u32 trace_start;
     __u32 trace_end;
     __u32 trace_end;
     __u32 event_count;
     __u32 event_count;
+    __u16 sport;
+    __u16 dport;
+    __u8 saddr[16];
+    __u8 daddr[16];
 	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
 	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
 	unsigned char span_id[APM_SPAN_ID_SIZE];
 	unsigned char span_id[APM_SPAN_ID_SIZE];
 	unsigned char trace_id_from[APM_TRACE_ID_SIZE];
 	unsigned char trace_id_from[APM_TRACE_ID_SIZE];
@@ -426,6 +430,17 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 	    bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
 	    bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
 	    // 清除trace信息
 	    // 清除trace信息
 	    cw_clear_trace(pid, tid, fd);
 	    cw_clear_trace(pid, tid, fd);
+        cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+        struct accept_connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
+        if (accept_conn) {
+            cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+            e->sport = accept_conn->sport;
+            e->dport = accept_conn->dport;
+            __builtin_memcpy(&e->saddr, &accept_conn->saddr, sizeof(e->saddr));
+            __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
+            // __sync_fetch_and_add(&accept_conn->bytes_sent, total_size);
+            
+        }
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         // 发送事件到用户空间 end
         // 发送事件到用户空间 end
 //        __u64 k_version = load_filter_pid();
 //        __u64 k_version = load_filter_pid();
@@ -447,10 +462,15 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 
 
         // //TODO 4 查询 
         // //TODO 4 查询 
         // cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
         // cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
-        // struct connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
+        // struct accept_connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
         // if (accept_conn && !is_tls) {
         // if (accept_conn && !is_tls) {
         //     cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
         //     cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
-        //     __sync_fetch_and_add(&accept_conn->bytes_sent, total_size);
+        //     e.sport = accept_conn.sport;
+        //     e.dport = accept_conn.dport;
+        //     __builtin_memcpy(&e.saddr, &accept_conn.saddr, sizeof(e.saddr));
+        //     __builtin_memcpy(&e.daddr, &accept_conn.daddr, sizeof(e.daddr));
+        //     // __sync_fetch_and_add(&accept_conn->bytes_sent, total_size);
+            
         // }
         // }
         return 0;
         return 0;
     }
     }

+ 24 - 14
ebpftracer/ebpf/tcp/state.c

@@ -85,6 +85,13 @@ struct connection {
     __u64 new_read_time;
     __u64 new_read_time;
 };
 };
 
 
+struct accept_connection {
+    __u16 sport;
+    __u16 dport;
+    __u8 saddr[16];
+    __u8 daddr[16];
+};
+
 struct {
 struct {
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(key_size, sizeof(struct connection_id));
     __uint(key_size, sizeof(struct connection_id));
@@ -95,7 +102,7 @@ struct {
 struct {
 struct {
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(key_size, sizeof(struct connection_id));
     __uint(key_size, sizeof(struct connection_id));
-    __uint(value_size, sizeof(struct connection));
+    __uint(value_size, sizeof(struct accept_connection));
     __uint(max_entries, MAX_CONNECTIONS);
     __uint(max_entries, MAX_CONNECTIONS);
 } active_accepts SEC(".maps");
 } active_accepts SEC(".maps");
 
 
@@ -275,19 +282,19 @@ int sys_enter_close(void *ctx) {
         bpf_map_delete_elem(&active_connections, &cid);
         bpf_map_delete_elem(&active_connections, &cid);
     }
     }
     cw_bpf_debug("socket accept socket sys_enter_close accept_Connection before cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
     cw_bpf_debug("socket accept socket sys_enter_close accept_Connection before cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
-    struct connection *acceptConn = bpf_map_lookup_elem(&active_accepts, &cid);
+    struct accept_connection *acceptConn = bpf_map_lookup_elem(&active_accepts, &cid);
     if (acceptConn) {
     if (acceptConn) {
-        struct tcp_event e = {};
-        e.type = EVENT_TYPE_ACCEPT_CLOSE;
-        e.pid = cid.pid;
-        e.fd = cid.fd;
-        e.bytes_sent = acceptConn->bytes_sent;
-        e.bytes_received = acceptConn->bytes_received;
-        e.timestamp = acceptConn->timestamp;
-        bpf_perf_event_output(ctx, &tcp_accept_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+        // struct tcp_event e = {};
+        // e.type = EVENT_TYPE_ACCEPT_CLOSE;
+        // e.pid = cid.pid;
+        // e.fd = cid.fd;
+        // e.bytes_sent = acceptConn->bytes_sent;
+        // e.bytes_received = acceptConn->bytes_received;
+        // e.timestamp = acceptConn->timestamp;
+        // bpf_perf_event_output(ctx, &tcp_accept_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
         bpf_map_delete_elem(&active_accepts, &cid);
         bpf_map_delete_elem(&active_accepts, &cid);
-        cw_bpf_debug("socket accept socket sys_enter_close accept_Connection cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
-        cw_bpf_debug("socket accept socket sys_enter_close accept_Connection cid.bytes_sent=%d, cid.bytes_received=%d\n", e.bytes_sent, e.bytes_received);
+        // cw_bpf_debug("socket accept socket sys_enter_close accept_Connection cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+        // cw_bpf_debug("socket accept socket sys_enter_close accept_Connection cid.bytes_sent=%d, cid.bytes_received=%d\n", e.bytes_sent, e.bytes_received);
     }
     }
 
 
     //TODO 2,增加active_accept 对应的判断,类比234行操作,新增EVENT_TYPE_accept_conn_CLOSE类型
     //TODO 2,增加active_accept 对应的判断,类比234行操作,新增EVENT_TYPE_accept_conn_CLOSE类型
@@ -453,8 +460,11 @@ int tracepoint__sys_exit_accept4(struct sys_exit_accept4_ctx *ctx) {
         cid.pid = pid_tgid >> 32;
         cid.pid = pid_tgid >> 32;
         cid.fd = fd;
         cid.fd = fd;
 
 
-        struct connection conn = {};
-        conn.timestamp = bpf_ktime_get_ns();
+        struct accept_connection conn = {};
+        conn.sport = tuple.sport;
+        conn.dport = tuple.dport;
+        __builtin_memcpy(&conn.saddr, &saddr, sizeof(conn.saddr));
+        __builtin_memcpy(&conn.daddr, &daddr, sizeof(conn.daddr));
         cw_bpf_debug("socket accept update active_accepts before cid.pid=%d, cid.fd=%lld\n", cid.pid, cid.fd);
         cw_bpf_debug("socket accept update active_accepts before cid.pid=%d, cid.fd=%lld\n", cid.pid, cid.fd);
         bpf_map_update_elem(&active_accepts, &cid, &conn, BPF_ANY);
         bpf_map_update_elem(&active_accepts, &cid, &conn, BPF_ANY);
         cw_bpf_debug("socket accept update active_accepts after cid.pid=%d, cid.fd=%lld\n", cid.pid, cid.fd);
         cw_bpf_debug("socket accept update active_accepts after cid.pid=%d, cid.fd=%lld\n", cid.pid, cid.fd);

+ 3 - 0
ebpftracer/l7/l7.go

@@ -3,6 +3,7 @@ package l7
 import (
 import (
 	"strconv"
 	"strconv"
 	"time"
 	"time"
+	"inet.af/netaddr"
 )
 )
 
 
 type Protocol uint8
 type Protocol uint8
@@ -155,6 +156,8 @@ type RequestData struct {
 	SpanId            string
 	SpanId            string
 	StartAt           uint64
 	StartAt           uint64
 	EndAt             uint64
 	EndAt             uint64
+    SAddr          	  netaddr.IPPort
+	DAddr          	  netaddr.IPPort
 	ParentSpanContext struct {
 	ParentSpanContext struct {
 		TraceIdFrom    string
 		TraceIdFrom    string
 		CalledId       string
 		CalledId       string

+ 6 - 0
ebpftracer/tracer.go

@@ -471,6 +471,10 @@ type l7Event struct {
 	TraceStart          uint32
 	TraceStart          uint32
 	TraceEnd            uint32
 	TraceEnd            uint32
 	EventCount          uint32
 	EventCount          uint32
+	Sport				uint16
+    Dport				uint16
+    SAddr          		[16]byte
+	DAddr          		[16]byte
 	AssumedAppId        HashByte
 	AssumedAppId        HashByte
 	SpanId              HashByte
 	SpanId              HashByte
 	TraceIdFrom         HashByte16
 	TraceIdFrom         HashByte16
@@ -704,6 +708,8 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				SpanId:       hex.EncodeToString(v.SpanId[:]),
 				SpanId:       hex.EncodeToString(v.SpanId[:]),
 				StartAt:      v.StartAt,
 				StartAt:      v.StartAt,
 				EndAt:        v.EndtAt,
 				EndAt:        v.EndtAt,
+				SAddr:		  ipPort(v.SAddr,v.Sport),
+				DAddr:		  ipPort(v.DAddr,v.Dport),
 			}
 			}
 			if v.TraceEnd == 1 {
 			if v.TraceEnd == 1 {
 				req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
 				req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])

+ 3 - 0
tracing/apm_tracing.go

@@ -239,6 +239,9 @@ func (t *Trace) TraceEndEvent(r *l7.RequestData) {
 		//t.span.SetAttributes(attribute.String("server.span_id_from", r.ParentSpanContext.SpanIdFrom))
 		//t.span.SetAttributes(attribute.String("server.span_id_from", r.ParentSpanContext.SpanIdFrom))
 	}
 	}
 
 
+	attr = append(attr, attribute.String("server.src_addr", r.SAddr.String()))
+	attr = append(attr, attribute.String("server.dst_addr", r.DAddr.String()))
+
 	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
 	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
 	t.span.SetAttributes(attr...)
 	t.span.SetAttributes(attr...)
 	t.endReadyEvent(r.EventCount)
 	t.endReadyEvent(r.EventCount)