Ver código fonte

Fixed #TASK_QT-13496 netflow数据支持perfevent发送

rock 1 ano atrás
pai
commit
688e49b85e
2 arquivos alterados com 75 adições e 59 exclusões
  1. 52 43
      ebpftracer/ebpf/netflow/kflowd.bpf.c
  2. 23 16
      ebpftracer/tracer.go

+ 52 - 43
ebpftracer/ebpf/netflow/kflowd.bpf.c

@@ -38,6 +38,12 @@ struct {
     // __type(value, struct RECORD_FS);
 } ringbuf_records SEC(".maps"); 
 
+struct {
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
+} perfevent_records SEC(".maps");
+
 struct {
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(max_entries, MAP_RECORDS_MAX);
@@ -151,7 +157,7 @@ static __always_inline cw_net_bool debug_file_is_tp(char *);
 const volatile char         debug[DBG_LEN_MAX];
 
 /* submit tcp or udp socket record to ringbuffer */
-static __always_inline int submit_sock_record(struct SOCK_INFO *sinfo) {
+static __always_inline int submit_sock_record(void* ctx, struct SOCK_INFO *sinfo) {
     if(!sinfo){
         return 0;
     }
@@ -286,16 +292,19 @@ static __always_inline int submit_sock_record(struct SOCK_INFO *sinfo) {
     }
 
     /* submit to ringbuffer */
-    if (bpf_ringbuf_output(&ringbuf_records, r, output_len, 0))
-        // bpf_printk("WARNING: Failed to submit %s socket record to ringbuffer for sock %u", GET_ROLE_STR(sinfo->role),
-        //            sinfo->sock);
-        bpf_printk("bpf_ringbuf_output success\n");
-
+    // if (bpf_ringbuf_output(&ringbuf_records, r, output_len, 0))
+    //     // bpf_printk("WARNING: Failed to submit %s socket record to ringbuffer for sock %u", GET_ROLE_STR(sinfo->role),
+    //     //            sinfo->sock);
+    //     bpf_printk("bpf_ringbuf_output success\n");
+
+    long error = bpf_perf_event_output(ctx, &perfevent_records, BPF_F_CURRENT_CPU, r, sizeof(*r));
+	if (error ==0){
+	}
     return 0;
 }
 
 /* check for expired records */
-static __always_inline void expire_sock_records() {
+static __always_inline void expire_sock_records(void* ctx) {
     struct SOCK_INFO *sq_sinfo;
     struct SOCK_QUEUE sq = {0};
     struct STATS     *s;
@@ -340,7 +349,7 @@ static __always_inline void expire_sock_records() {
                     /* set udp state to close on idle timeout */
                     if (sq_sinfo->proto == IPPROTO_UDP && ts_now - sq.ts > agg_idle_timeout * (u64)1e9)
                         sq_sinfo->state = UDP_CLOSE;
-                    submit_sock_record(sq_sinfo);
+                    submit_sock_record(ctx, sq_sinfo);
                     if (s)
                         s->q_pop_expired++;
                     if (debug_proc(NULL, NULL)) {
@@ -605,7 +614,7 @@ static __always_inline int handle_tcp_event(void *ctx, const struct SOCK_EVENT_I
             }
             /* submit final record and delete closed client and server sockets */
             sinfo->state = tcp_state;
-            submit_sock_record(sinfo);
+            submit_sock_record(ctx, sinfo);
             if (bpf_map_delete_elem(&hash_socks, &key))
                 // bpf_printk("WARNING: Failed to delete %s socket for key %lx and pid %u\n", GET_ROLE_STR(sinfo->role),
                         //    key, sinfo->pid);
@@ -701,7 +710,7 @@ int cw_net_inet_sock_set_state(struct trace_event_raw_inet_sock_set_state *args)
         return 0;
 
     struct SOCK_EVENT_INFO event = {0, 0, family, 0, 0, (uint64_t)args, 0, "inet_sock_set_state"};
-    handle_tcp_event(NULL, &event);
+    handle_tcp_event(args, &event);
 
     return 0;
 }
@@ -738,7 +747,7 @@ int kretprobe_inet_csk_accept(struct pt_regs *ctx){
 }
 
 /* handle tcp packet */
-static __always_inline int handle_tcp_packet(struct cw_net_sock *sock, struct sk_buff *skb, cw_net_bool isrx) {
+static __always_inline int handle_tcp_packet(void* ctx, struct cw_net_sock *sock, struct sk_buff *skb, cw_net_bool isrx) {
     KPROBE_SWITCH(MONITOR_SOCK);
     struct cw_net_task_struct *task = (struct cw_net_task_struct *)bpf_get_current_task();
     struct SOCK_INFO   *sinfo;
@@ -751,7 +760,7 @@ static __always_inline int handle_tcp_packet(struct cw_net_sock *sock, struct sk
     __u32               zero = 0;
 
     /* clean expired records */
-    expire_sock_records();
+    expire_sock_records(ctx);
 
     /* try to get sock from buffer if zero */
     if (!sock) {
@@ -999,7 +1008,7 @@ int kprobe_tcp_v4_do_rcv(struct pt_regs *ctx) {
     // struct cw_net_sock *sock = (struct cw_net_sock *)ctx->di;
     struct sk_buff *skb = (struct sk_buff *)PT_REGS_PARM2(ctx);
     // struct sk_buff *skb = (struct sk_buff *)ctx->si;
-    handle_tcp_packet(sock, skb, true);
+    handle_tcp_packet(ctx, sock, skb, true);
 
     return 0;
 }
@@ -1013,7 +1022,7 @@ int kprobe_tcp_v6_do_rcv(struct pt_regs *ctx) {
     // struct cw_net_sock *sock = (struct cw_net_sock *)ctx->di;
     struct sk_buff *skb = (struct sk_buff *)PT_REGS_PARM2(ctx);
     // struct sk_buff *skb = (struct sk_buff *)ctx->si;
-    handle_tcp_packet(sock, skb, true);
+    handle_tcp_packet(ctx, sock, skb, true);
     return 0;
 }
 
@@ -1035,7 +1044,7 @@ int kprobe_ip_local_out(struct pt_regs *ctx) {
     __u16 proto = sc_proto;
     if (proto != IPPROTO_TCP)
         return 0;
-    handle_tcp_packet(sock, skb, false);
+    handle_tcp_packet(ctx, sock, skb, false);
 
     return 0;
 }
@@ -1058,7 +1067,7 @@ int kprobe_ip6_xmit(struct pt_regs *ctx){
     __u16 proto = sc_proto;
     if (proto != IPPROTO_TCP)
         return 0;
-    handle_tcp_packet(sock, skb, false);
+    handle_tcp_packet(ctx, sock, skb, false);
 
     return 0;
 }
@@ -1169,7 +1178,7 @@ static __always_inline int handle_udp_event(void *ctx, const struct SOCK_EVENT_I
         }
     }
     /* clean expired records */
-    expire_sock_records();
+    expire_sock_records(ctx);
 
     /* lookup and update socket */
     // key = KEY_SOCK(BPF_CORE_READ(sock, __sk_common.skc_hash));
@@ -1229,32 +1238,32 @@ static __always_inline int handle_udp_event(void *ctx, const struct SOCK_EVENT_I
         // sinfo->state = BPF_CORE_READ(sock, __sk_common.skc_state);
 
         /* add application data (dns) */
-        // if (is_app_port[APP_DNS]) {
-        //     if (sinfo->app_msg.cnt < APP_MSG_MAX) {
-        //         num = sinfo->app_msg.cnt++;
-        //         sinfo->app_msg.type = APP_DNS;
-        //         sinfo->app_msg.ts[num] = bpf_ktime_get_ns();
-        //         sinfo->app_msg.len[num] = data_len;
-        //         sinfo->app_msg.isrx[num] = isrx;
-        //         bpf_probe_read_kernel(sinfo->app_msg.data[num], MIN((__u16)data_len, sizeof(sinfo->app_msg.data[num])),
-        //                               dnshdr);
-        //         /* export record on max application messages */
-        //         if (sinfo->app_msg.cnt >= APP_MSG_MAX) {
-        //             // submit_sock_record(sinfo);
-        //             if (bpf_map_delete_elem(&hash_socks, &key))
-        //                 // bpf_printk("WARNING: Failed to delete %s socket for key %lx and pid %u\n",
-        //                 //            GET_ROLE_STR(sinfo->role), key, sinfo->pid);
-        //                 bpf_printk("bpf_map_delete_elem failed\n");
-        //             // else if (debug_proc(sinfo->comm, NULL))
-        //             else
-        //                 // bpf_printk("Submitted and deleted %s socket due to app message limit for key %lx and pid %u\n",
-        //                 //            GET_ROLE_STR(sinfo->role), key, sinfo->pid);
-        //                 bpf_printk("Submitted and deleted\n");
-        //             return 0;
-        //         }
-        //     } else
-        //         bpf_printk("WARNING: Failed to capture dns application message #%u\n", sinfo->app_msg.cnt);
-        // }
+        if (is_app_port[APP_DNS]) {
+            if (sinfo->app_msg.cnt < APP_MSG_MAX) {
+                num = sinfo->app_msg.cnt++;
+                sinfo->app_msg.type = APP_DNS;
+                sinfo->app_msg.ts[num] = bpf_ktime_get_ns();
+                sinfo->app_msg.len[num] = data_len;
+                sinfo->app_msg.isrx[num] = isrx;
+                bpf_probe_read_kernel(sinfo->app_msg.data[num], MIN((__u16)data_len, sizeof(sinfo->app_msg.data[num])),
+                                      dnshdr);
+                /* export record on max application messages */
+                if (sinfo->app_msg.cnt >= APP_MSG_MAX) {
+                    submit_sock_record(ctx, sinfo);
+                    if (bpf_map_delete_elem(&hash_socks, &key))
+                        // bpf_printk("WARNING: Failed to delete %s socket for key %lx and pid %u\n",
+                        //            GET_ROLE_STR(sinfo->role), key, sinfo->pid);
+                        bpf_printk("bpf_map_delete_elem failed\n");
+                    // else if (debug_proc(sinfo->comm, NULL))
+                    else
+                        // bpf_printk("Submitted and deleted %s socket due to app message limit for key %lx and pid %u\n",
+                        //            GET_ROLE_STR(sinfo->role), key, sinfo->pid);
+                        bpf_printk("Submitted and deleted\n");
+                    return 0;
+                }
+            } else
+                bpf_printk("WARNING: Failed to capture dns application message #%u\n", sinfo->app_msg.cnt);
+        }
         if (!bpf_map_update_elem(&hash_socks, &key, sinfo, BPF_ANY)) {
             // if (debug_proc(sinfo->comm, NULL))
                 // bpf_printk("Updated %s socket %lx for pid %u", GET_ROLE_STR(sinfo->role), key, pid);

+ 23 - 16
ebpftracer/tracer.go

@@ -124,6 +124,7 @@ const (
 	perfMapTypeL7Events     perfMapType = 4
 	perfMapTypeSocketEvents perfMapType = 5
 	perfMapTypeEventQueue   perfMapType = 6
+	perfMapTypeL4Events     perfMapType = 7
 )
 
 type ringMapType uint8
@@ -299,29 +300,33 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		{name: "ringbuf_records", typ: ringMapTypeL4Events},
 	}
 	//TODO rock 添加ringbuffer 处理函数
-	fmt.Println("ringMaps start --")
-			for _, pm := range ringMaps {
-				fmt.Println(pm.name)
-				m, ok := t.collection.Maps[pm.name]
-				if ok {
-					r, err := ringbuf.NewReader(m) // new Ring_Buffer_Reader......
-					if err != nil {
-						t.Close()
-						return fmt.Errorf("failed to create ebpf reader: %w", err)
-					}
-					t.ringBufReaders[pm.name] = r
-					// event监听
-					go runRingBuffEventsReader(pm.name, r, ch, pm.typ) // ring_buff_reader...
-				}
-
+	if !t.DisableL7Tracing() {
+		fmt.Println("ringMaps start --")
+		for _, pm := range ringMaps {
+			fmt.Println(pm.name)
+		m, ok := t.collection.Maps[pm.name]
+		if ok {
+			r, err := ringbuf.NewReader(m) // new Ring_Buffer_Reader......
+			if err != nil {
+				t.Close()
+				return fmt.Errorf("failed to create ebpf reader: %w", err)
 			}
-	fmt.Println("ringMaps end --")
+			t.ringBufReaders[pm.name] = r
+			// event监听
+			go runRingBuffEventsReader(pm.name, r, ch, pm.typ) // ring_buff_reader...
+		}
+
+		}
+		fmt.Println("ringMaps end --")
+	}
+	
 
 	fmt.Println(len(collectionSpec.Programs))
 	fmt.Println(len(c.Programs))
 	tracer.MapInsert(c)
 	if !t.DisableL7Tracing() {
 		perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
+		perfMaps = append(perfMaps, perfMap{name: "perfevent_records", typ: perfMapTypeL4Events, perCPUBufferSizePages: 32})
 	}
 	perfMaps = append(perfMaps, perfMap{name: tracer.MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
 	fmt.Println("perfMaps start --")
@@ -802,6 +807,8 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 
 			//fmt.Println(string(v.Data))
 			//continue
+		case perfMapTypeL4Events:
+			klog.Infoln("get one data of perfMapTypeL4Events")
 		case perfMapTypeL7Events:
 			v := &l7Event{}
 			reader := bytes.NewBuffer(rec.RawSample)