Browse Source

Feature #TASK_QT-18250 容器化&golang适配

Carl 1 năm trước cách đây
mục cha
commit
4bb785d861

+ 1 - 9
Dockerfile

@@ -1,21 +1,13 @@
 FROM ubuntu:22.04
-
 ARG EUSPACE_BASE_PATH=/opt/cloudwise/apm/euspace
 RUN mkdir -p $EUSPACE_BASE_PATH
-
 #拷贝安装目录结构
 ADD ./dist/package_dir/  $EUSPACE_BASE_PATH/
-
 ARG EUSPACE_BIN_PATH=$EUSPACE_BASE_PATH/bin
-
 # 拷贝euspace可执行文件,make直接生成到dist/package_dir/bin/euspace
 #COPY ./euspace $EUSPACE_BIN_PATH/
-
 # 设置工作目录
 WORKDIR $EUSPACE_BIN_PATH
-
 # 设置PATH变量
 ENV PATH=$PATH:$EUSPACE_BIN_PATH
-
-ENTRYPOINT ["euspace"]
-
+ENTRYPOINT ["euspace"]

+ 1 - 1
containers/container.go

@@ -633,7 +633,7 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
 			klog.Warningln(err)
 			return
 		}
-		klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
+		//klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
 		details.NsIPs = ips
 	}
 }

+ 13 - 7
containers/container_apm.go

@@ -115,7 +115,9 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		if r.TraceStart == TRACE_STATUS {
 			// klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
 			trace, err := c.getOrInitTrace(r.TraceId)
-			klog.Debugf("payload:[%s]", r.Payload)
+			if c.AppInfo.AppName != "" {
+				klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
+			}
 			if err == nil {
 				method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload)
 				ip, _ := netaddr.ParseIP(sn)
@@ -161,6 +163,10 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	//trace := tracing.NewTrace(string(c.id), conn.ActualDest)
 	switch r.Protocol {
 	case l7.ProtocolHTTP:
+		if c.AppInfo.AppName != "" {
+			klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
+		}
+
 		stats.observe(r.Status.Http(), "", r.Duration)
 	case l7.ProtocolHTTP2:
 		if conn.http2Parser == nil {
@@ -644,10 +650,10 @@ func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int)
 			}
 			// 判断规则
 			if strings.Contains(cmdline, ruleVal) {
-				if !codeType.IsJvmCode() {
-					klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
-					return false, 0
-				}
+				//if !codeType.IsJvmCode() {
+				//	klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
+				//	return false, 0
+				//}
 				c.WhiteSettingInfo.AppName = setting.AppName
 				c.WhiteSettingInfo.Filters = setting.Filters
 				klog.WithField("pid", pid).
@@ -716,7 +722,7 @@ func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachT
 		}
 		//delete the proc info form proc_info_map(for kernel) when the uprobe detached
 		if err := tracer.DelKProcInfo(pid); err != nil {
-			return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d,detachType:%d", pid, detachType)
+			return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
 		} else {
 			klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
 			c.AppInfo.EBPFProcInfo = nil
@@ -782,7 +788,7 @@ func (c *Container) getRootfs() string {
 }
 
 func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
-	if c.AppInfo.AppName != "" {
+	if c != nil && c.AppInfo.AppName != "" {
 		detail := AppStatusInfo{
 			Pid:        pid,
 			ProcName:   c.containerName,

+ 1 - 1
containers/registry.go

@@ -448,7 +448,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					//fmt.Println("EventTypeL7Request", e.Pid, c.Isl7AttachSuccess())
 					//a, _ := json.Marshal(e.L7Request)
 					//fmt.Println("EventTypeL7Request", e.Pid, string(a))
-					klog.Debugln("Payload:", string(e.L7Request.Payload))
+					//klog.Debugln("Payload:", string(e.L7Request.Payload))
 					ip2fqdn := c.onL7RequestApm(e.Pid, e.Fd, e.Timestamp, e.L7Request)
 					r.ip2fqdnLock.Lock()
 					for ip, fqdn := range ip2fqdn {

+ 13 - 4
ebpftracer/ebpf/l7/apm_trace.c

@@ -282,8 +282,12 @@ struct apm_trace_info_t *get_trace_info_by_fd(__u32 pid, __u32 fd) {
 static __inline __attribute__((__always_inline__))
 void cw_save_parent_tracking_span(struct apm_span_context *sc) {
 	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
-	long err = 0;
-	err = bpf_map_update_elem(&apm_parent_span_context_map, &trace_key, sc, BPF_ANY);
+//	__u64 pid_tgid = bpf_get_current_pid_tgid();
+//	cw_bpf_debug("[pid:%d][goid:%d] save psc header", (__u32) (pid_tgid >> 32), trace_key.goid);
+//	for (int i = 0; i < APM_APP_ID_SIZE; i++) {
+//		cw_bpf_debug("trace_enter_write - span_id_from = %02x", sc->app_id[i]);
+//	}
+	long err = bpf_map_update_elem(&apm_parent_span_context_map, &trace_key, sc, BPF_ANY);
 	if (err != 0) {
 		cw_bpf_debug("Failed to update tracked_spans map: %ld", err);
 		return;
@@ -389,8 +393,13 @@ void cw_save_current_tracking_span(struct apm_span_context *sc) {
 
 static __inline __attribute__((__always_inline__))
 struct apm_span_context *cw_get_current_tracking_span(struct apm_trace_info_t *trace_info) {
-	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
-//	bpf_printk("[get cw sc]%d goid:%d",trace_key.goid,get_current_goroutine());
+	struct apm_trace_key_t trace_key = {0};
+	if (trace_info){
+		trace_key = trace_info->trace_key;
+	}
+//	bpf_printk("[get cw sc]%d",trace_key.goid);
+//	bpf_printk("[get cw sc2]%d",trace_info->trace_key.goid);
+
 	struct apm_span_context *apm_sc = {0};
 	struct apm_span_context *span_contexts = bpf_map_lookup_elem(&apm_current_span_context_map, &trace_key);
 //	bpf_printk("-------");

+ 6 - 3
ebpftracer/ebpf/l7/l7.c

@@ -427,13 +427,14 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 	    // parent sc
 		struct apm_span_context *cw_psc = cw_get_parent_tracking_span_by_trace_key(start_trace_info->trace_key);
 	    if(cw_psc){
+//		    bpf_printk("[pid:%d][goid:%d] find header",pid,start_trace_info->trace_key.goid);
 		    cw_copy_byte_arrays(cw_psc->trace_id, e->trace_id_from, APM_TRACE_ID_SIZE);
 		    cw_copy_byte_arrays(cw_psc->assumed_app_id, e->called_id, APM_ASSUMED_APP_ID_SIZE);
 		    cw_copy_byte_arrays(cw_psc->instance_id, e->instance_id_from, APM_INSTANCE_ID_SIZE);
 		    cw_copy_byte_arrays(cw_psc->app_id, e->app_id_from, APM_APP_ID_SIZE);
 		    cw_copy_byte_arrays(cw_psc->span_id, e->span_id_from, APM_SPAN_ID_SIZE);
-//		    for (int i = 0; i < APM_TRACE_ID_SIZE; i++) {
-//			    cw_bpf_debug("trace_enter_write - span_id_from = %02x", e->span_id_from[i]);
+//		    for (int i = 0; i < APM_APP_ID_SIZE; i++) {
+//			    bpf_printk("trace_enter_write - span_id_from = %02x", e->app_id_from[i]);
 //		    }
 	    }
         struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
@@ -869,7 +870,9 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
 
         //bpf_tail_call PROGUP(l7_http_request)
         cw_bpf_debug("======== PROG_DATA_L7_HTTP_TRACE_ID_UP_IDX ========== __KERNEL_FROM < 512 pid:[%d] ",tid);
-        bpf_tail_call(ctx, &NAME(progs_jmp_tp_map), PROG_DATA_L7_HTTP_TRACE_ID_TP_IDX);
+	    if (proc_info->code_type != CodeTypeGo) {
+		    bpf_tail_call(ctx, &NAME(progs_jmp_tp_map), PROG_DATA_L7_HTTP_TRACE_ID_TP_IDX);
+	    }
          return 0;
     }
 

+ 174 - 62
ebpftracer/ebpf/utrace/go/include/alloc.h

@@ -35,30 +35,46 @@ struct
 //    __uint(pinning, LIBBPF_PIN_BY_NAME);
 } alloc_map SEC(".maps");
 
+struct
+{
+	__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
+	__type(key, u64);
+	__type(value, u64);
+	__uint(max_entries, MAX_ENTRIES);
+//    __uint(pinning, LIBBPF_PIN_BY_NAME);
+} proc_alloc_map SEC(".maps");
+
 static __always_inline u64 get_area_start(u64 start_addr,u64 end_addr)
 {
-    u32 k0 = 0;
+	u32 k0 = 0;
 	struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
 	if (!trace_conf) {
-        return 0;
+		return 0;
+	}
+	s64 partition_size = (end_addr - start_addr) / trace_conf->total_cpus;
+	u32 current_cpu = bpf_get_smp_processor_id();
+	s32 start_index = 0;
+
+	u64 alloc_map_index = ((u64)(bpf_get_current_pid_tgid() >> 32) << 32) | start_index;
+	u64 *start = (u64 *)bpf_map_lookup_elem(&proc_alloc_map, &alloc_map_index);
+	if (start == NULL || *start == 0)
+	{
+//	    bpf_printk("start == NULL || *start == 0");
+		u64 current_start_addr = start_addr + (partition_size * current_cpu);
+		bpf_map_update_elem(&proc_alloc_map, &alloc_map_index, &current_start_addr, BPF_ANY);
+		return current_start_addr;
+	}
+	else
+	{
+//	    bpf_printk("else return");
+		return *start;
 	}
-    s64 partition_size = (end_addr - start_addr) / trace_conf->total_cpus;
-    u32 current_cpu = bpf_get_smp_processor_id();
-    s32 start_index = 0;
-    u64 *start = (u64 *)bpf_map_lookup_elem(&alloc_map, &start_index);
-    if (start == NULL || *start == 0)
-    {
-        u64 current_start_addr = start_addr + (partition_size * current_cpu);
-        bpf_map_update_elem(&alloc_map, &start_index, &current_start_addr, BPF_ANY);
-        return current_start_addr;
-    }
-    else
-    {
-        return *start;
-    }
 }
 
-static __always_inline u64 get_area_end(u64 start,u64 start_addr,u64 end_addr)
+
+
+
+/*static __always_inline u64 get_area_end(u64 start,u64 start_addr,u64 end_addr)
 {
     u32 k0 = 0;
 	struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
@@ -78,8 +94,33 @@ static __always_inline u64 get_area_end(u64 start,u64 start_addr,u64 end_addr)
     {
         return *end;
     }
+}*/
+
+static __always_inline u64 get_area_end(u64 start,u64 start_addr,u64 end_addr)
+{
+	u32 k0 = 0;
+	struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
+	if (!trace_conf) {
+		return 0;
+	}
+	s64 partition_size = (end_addr - start_addr) / trace_conf->total_cpus;
+	s32 end_index = 1;
+	u64 alloc_map_index = ((u64)(bpf_get_current_pid_tgid() >> 32) << 32) | end_index;
+
+	u64 *end = (u64 *)bpf_map_lookup_elem(&proc_alloc_map, &alloc_map_index);
+	if (end == NULL || *end == 0)
+	{
+		u64 current_end_addr = start + partition_size;
+		bpf_map_update_elem(&proc_alloc_map, &alloc_map_index, &current_end_addr, BPF_ANY);
+		return current_end_addr;
+	}
+	else
+	{
+		return *end;
+	}
 }
 
+
 static __always_inline s32 bound_number(s32 num, s32 min, s32 max)
 {
     if (num < min)
@@ -156,12 +197,80 @@ static __always_inline void *write_target_data(void *data, s32 size)
     }
 }
 
+//static __always_inline void *cw_write_target_data(void *data, s32 size, struct ebpf_proc_info* info)
+//{
+//    if (!data || data == NULL)
+//    {
+//	    bpf_printk("163 err");
+//        return NULL;
+//    }
+//	u64 start_from_proc;
+//	u64 end_from_proc;
+////	__u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
+////	struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+////	if (!info) {
+////		return NULL;
+////	}
+//	start_from_proc = info->start_addr;
+//	end_from_proc = info->end_addr;
+//
+//	u64 start = get_area_start(start_from_proc, end_from_proc);
+//    u64 end = get_area_end(start,start_from_proc,end_from_proc);
+//    if (end - start < size)
+//    {
+//        cw_bpf_debug("reached end of CPU memory block, going to the start again");
+//        s32 start_index = 0;
+//        bpf_map_delete_elem(&alloc_map, &start_index);
+//        start = get_area_start(start_from_proc, end_from_proc);
+//    }
+//
+//    void *target = (void *)start;
+//    size = bound_number(size, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+//    u64 page_offset = (u64)target & 0xFFF;
+//    u64 dist_to_next_page = 4096 - page_offset;
+//    if (dist_to_next_page < size)
+//    {
+//        target += dist_to_next_page;
+//    }
+//    u64 target_u = (u64)target;
+//    if (target_u > end_from_proc || target_u < start_from_proc) {
+//	    cw_bpf_debug("TARGET ADDRESS IS OUT OF BOUNDS: 0x%llx", target);
+////	    bpf_printk("no target_u:%llu start:%llu end:%llu", target_u, start_from_proc, end_from_proc);
+//	    bpf_printk("197 err");
+//        return NULL;
+//    }
+//
+//    long success = bpf_probe_write_user(target, data, size);
+//    if (success == 0)
+//    {
+//        s32 start_index = 0;
+//        // Update the start position of this chunk, taking into account possible adjustments
+//        // we made to be page aligned
+//        u64 updated_start = target_u + size;
+//
+//        // align updated_start to 8 bytes
+//        if (updated_start % 8 != 0) {
+//            updated_start += 8 - (updated_start % 8);
+//        }
+//
+//        bpf_map_update_elem(&alloc_map, &start_index, &updated_start, BPF_ANY);
+//        return target;
+//    }
+//    else
+//    {
+//        bpf_printk("failed to write to userspace, error code: %d, addr: %lx, size: %d", success, target, size);
+//        return NULL;
+//    }
+//}
+
+
 static __always_inline void *cw_write_target_data(void *data, s32 size, struct ebpf_proc_info* info)
 {
-    if (!data || data == NULL)
-    {
-        return NULL;
-    }
+	if (!data || data == NULL)
+	{
+		bpf_printk("163 err");
+		return NULL;
+	}
 	u64 start_from_proc;
 	u64 end_from_proc;
 //	__u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
@@ -173,50 +282,53 @@ static __always_inline void *cw_write_target_data(void *data, s32 size, struct e
 	end_from_proc = info->end_addr;
 
 	u64 start = get_area_start(start_from_proc, end_from_proc);
-    u64 end = get_area_end(start,start_from_proc,end_from_proc);
-    if (end - start < size)
-    {
-        cw_bpf_debug("reached end of CPU memory block, going to the start again");
-        s32 start_index = 0;
-        bpf_map_delete_elem(&alloc_map, &start_index);
-        start = get_area_start(start_from_proc, end_from_proc);
-    }
-
-    void *target = (void *)start;
-    size = bound_number(size, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
-    u64 page_offset = (u64)target & 0xFFF;
-    u64 dist_to_next_page = 4096 - page_offset;
-    if (dist_to_next_page < size)
-    {
-        target += dist_to_next_page;
-    }
-    u64 target_u = (u64)target;
-    if (target_u > end_from_proc || target_u < start_from_proc) {
-	    cw_bpf_debug("TARGET ADDRESS IS OUT OF BOUNDS: 0x%llx", target);
-        return NULL;
-    }
+	u64 end = get_area_end(start,start_from_proc,end_from_proc);
+	s32 start_index = 0;
+	u64 alloc_map_index = ((u64)(bpf_get_current_pid_tgid() >> 32) << 32) | start_index;
+	if (end - start < size)
+	{
+		cw_bpf_debug("reached end of CPU memory block, going to the start again");
+		bpf_map_delete_elem(&proc_alloc_map, &alloc_map_index);
+		start = get_area_start(start_from_proc, end_from_proc);
+	}
 
-    long success = bpf_probe_write_user(target, data, size);
-    if (success == 0)
-    {
-        s32 start_index = 0;
-        // Update the start position of this chunk, taking into account possible adjustments
-        // we made to be page aligned
-        u64 updated_start = target_u + size;
+	void *target = (void *)start;
+	size = bound_number(size, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+	u64 page_offset = (u64)target & 0xFFF;
+	u64 dist_to_next_page = 4096 - page_offset;
+	if (dist_to_next_page < size)
+	{
+		target += dist_to_next_page;
+	}
+	u64 target_u = (u64)target;
+	if (target_u > end_from_proc || target_u < start_from_proc) {
+		cw_bpf_debug("TARGET ADDRESS IS OUT OF BOUNDS: 0x%llx", target);
+//	    bpf_printk("no target_u:%llu start:%llu end:%llu", target_u, start_from_proc, end_from_proc);
+		bpf_printk("197 err");
+		return NULL;
+	}
+//	bpf_printk("ok target_u:%llu start:%llu end:%llu", target_u, start, end);
 
-        // align updated_start to 8 bytes
-        if (updated_start % 8 != 0) {
-            updated_start += 8 - (updated_start % 8);
-        }
+	long success = bpf_probe_write_user(target, data, size);
+	if (success == 0)
+	{
+//        s32 start_index = 0;
+		// Update the start position of this chunk, taking into account possible adjustments
+		// we made to be page aligned
+		u64 updated_start = target_u + size;
 
-        bpf_map_update_elem(&alloc_map, &start_index, &updated_start, BPF_ANY);
-        return target;
-    }
-    else
-    {
-        bpf_printk("failed to write to userspace, error code: %d, addr: %lx, size: %d", success, target, size);
-        return NULL;
-    }
+		// align updated_start to 8 bytes
+		if (updated_start % 8 != 0) {
+			updated_start += 8 - (updated_start % 8);
+		}
+		bpf_map_update_elem(&proc_alloc_map, &alloc_map_index, &updated_start, BPF_ANY);
+		return target;
+	}
+	else
+	{
+		bpf_printk("failed to write to userspace, error code: %d, addr: %lx, size: %d", success, target, size);
+		return NULL;
+	}
 }
 
 #endif

+ 12 - 3
ebpftracer/ebpf/utrace/go/net/client.probe.bpf.c

@@ -12,6 +12,8 @@
 #define MAX_METHOD_SIZE 10
 #define MAX_CONCURRENT 50
 
+#define CLIENT_MAP_KEY 1
+
 struct http_request_t {
     BASE_SPAN_PROPERTIES
     char host[MAX_HOSTNAME_SIZE];
@@ -62,6 +64,7 @@ struct
 // volatile const u64 request_proto_pos;
 
 static __always_inline long inject_header(void* headers_ptr, struct span_context* propagated_ctx) {
+	bpf_printk("11111111111111");
     // Read the key-value count - this field must be the first one in the hmap struct as documented in src/runtime/map.go
     u64 curr_keyvalue_count = 0;
     long res = bpf_probe_read_user(&curr_keyvalue_count, sizeof(curr_keyvalue_count), headers_ptr);
@@ -78,11 +81,13 @@ static __always_inline long inject_header(void* headers_ptr, struct span_context
 
     // Get pointer to temp bucket struct we store in a map (avoiding large local variable on the stack)
     // Performing read-modify-write on the bucket
-    u32 map_id = 0;
+    u32 map_id = 1;
     struct map_bucket *bucket_map_value = bpf_map_lookup_elem(&golang_mapbucket_storage_map, &map_id);
     if (!bucket_map_value) {
         return -1;
     }
+	__builtin_memset(bucket_map_value, 0, sizeof(struct map_bucket));
+
     __u64 pid_tgid = bpf_get_current_pid_tgid();
 	__u32 tgid = pid_tgid >> 32;
 	struct ebpf_proc_info *proc_info =
@@ -96,6 +101,8 @@ static __always_inline long inject_header(void* headers_ptr, struct span_context
     void *bucket_ptr = 0; // The actual pointer to the buckets
 
     if (curr_keyvalue_count == 0) {
+	    bpf_printk("curr_keyvalue_count == 0");
+
         // No key-value pairs in the Go map, need to "allocate" memory for the user
         bucket_ptr = write_target_data(bucket_map_value, sizeof(struct map_bucket));
         if (bucket_ptr == NULL) {
@@ -117,6 +124,7 @@ static __always_inline long inject_header(void* headers_ptr, struct span_context
     }
 
     u8 bucket_index = curr_keyvalue_count & 0x7;
+	bpf_printk("bucket_index:%d",bucket_index);
 
     char traceparent_tophash = 0xee;
     bucket_map_value->tophash[bucket_index] = traceparent_tophash;
@@ -413,11 +421,12 @@ static __always_inline long cw_inject_header_half(void* headers_ptr, char * head
 
     // Get pointer to temp bucket struct we store in a map (avoiding large local variable on the stack)
     // Performing read-modify-write on the bucket
-    u32 map_id = 0;
+    u32 map_id = CLIENT_MAP_KEY;
     struct map_bucket *bucket_map_value = bpf_map_lookup_elem(&golang_mapbucket_storage_map, &map_id);
     if (!bucket_map_value) {
         return -1;
     }
+	__builtin_memset(bucket_map_value, 0, sizeof(struct map_bucket));
 //    __u64 pid_tgid = bpf_get_current_pid_tgid();
 //	__u32 tgid = pid_tgid >> 32;
 //	struct ebpf_proc_info *proc_info =
@@ -715,7 +724,7 @@ PROGUP(go_update_header)(struct pt_regs *ctx) {
 //	if (!tail_calls_context){
 //		return -1;
 //	}
-	u32 map_id = 0;
+	u32 map_id = CLIENT_MAP_KEY;
 
 	struct map_bucket *bucket_map_value = bpf_map_lookup_elem(&golang_mapbucket_storage_map, &map_id);
 	if (!bucket_map_value) {

+ 11 - 5
ebpftracer/ebpf/utrace/go/net/server.probe.bpf.c

@@ -57,7 +57,7 @@ struct {
 	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
 	__uint(key_size, sizeof(u32));
 	__uint(value_size, sizeof(struct map_bucket));
-	__uint(max_entries, 1);
+	__uint(max_entries, MAX_CONCURRENT);
 } golang_mapbucket_storage_map SEC(".maps");
 
 struct {
@@ -287,6 +287,8 @@ static __always_inline struct map_bucket *get_map_bucket(void *headers_ptr_ptr)
 	if (!map_value) {
 		return NULL;
 	}
+	__builtin_memset(map_value, 0, sizeof(struct map_bucket));
+
 	for (u32 j = 0; j < 8; j++) {
 		if (j >= bucket_count) {
 			break;
@@ -381,6 +383,7 @@ int uprobe_HandlerFunc_ServeHTTP(struct pt_regs *ctx) {
 
 	struct http_server_span_t *http_server_span = &uprobe_data->span;
 	http_server_span->start_time = bpf_ktime_get_ns();
+//	return 1;
 
 	// Propagate context
 	void *req_ptr = get_argument(ctx, 4);
@@ -391,13 +394,14 @@ int uprobe_HandlerFunc_ServeHTTP(struct pt_regs *ctx) {
 	}
 
 	char * traceparent_header_value = get_header_val_off(map_bucket_p);
+	struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&cw_parent_span_context_storage_map, &map_id);
+	if (!cw_parent_span_context) {
+		return 0;
+	}
+	__builtin_memset(cw_parent_span_context, 0, sizeof(struct apm_span_context));
 
 	if (traceparent_header_value != NULL) {
 		cw_bpf_debug("traceparent_header_value != NULL");
-		struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&cw_parent_span_context_storage_map, &map_id);
-		if (!cw_parent_span_context) {
-			return 0;
-		}
 
 		cw_string_to_span_context(traceparent_header_value, cw_parent_span_context);
 		// found parent context in http headers
@@ -415,6 +419,7 @@ int uprobe_HandlerFunc_ServeHTTP(struct pt_regs *ctx) {
 		if (!cw_parent_span_context) {
 			return 0;
 		}
+
 		// struct apm_span_context context = {};
 		generate_random_bytes(cw_parent_span_context->trace_id, TRACE_ID_SIZE);
 		// generate_random_bytes(context.span_id, SPAN_ID_SIZE);
@@ -452,6 +457,7 @@ int uprobe_HandlerFunc_ServeHTTP(struct pt_regs *ctx) {
 // func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request)
 SEC("uprobe/HandlerFunc_ServeHTTP")
 int uprobe_HandlerFunc_ServeHTTP_Returns(struct pt_regs *ctx) {
+//	return 1;
 	cw_bpf_debug("[uprobe_HandlerFunc_ServeHTTP_Returns]");
 //	u64 end_time = bpf_ktime_get_ns();
 	__u64 pid_tgid = bpf_get_current_pid_tgid();

+ 83 - 60
ebpftracer/tls.go

@@ -23,7 +23,7 @@ import (
 )
 
 const (
-	minSupportedGoVersion = "v1.17.0"
+	minSupportedGoVersion = "v1.15.0"
 	goTlsWriteSymbol      = "crypto/tls.(*Conn).Write"
 	goTlsReadSymbol       = "crypto/tls.(*Conn).Read"
 	goExecute             = "runtime.execute"
@@ -201,13 +201,13 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 	}
 
 	offset, ok := tracer.GetOffset(tracer.NewID("std", "runtime", "g", "goid"), path)
+	bucketsOff, ok2 := tracer.GetOffset(tracer.NewID("std", "runtime", "hmap", "buckets"), path)
 
-	fmt.Println(offset, ok, version)
-	if ok {
+	if ok && ok2 {
 		realVersion := strings.Replace(bi.GoVersion, "go", "", 1)
 		parts := strings.Split(realVersion, ".")
 		var major, minor, revision int
-		if len(parts) >= 3 {
+		if len(parts) >= 2 {
 			major, err = strconv.Atoi(parts[0])
 			if err != nil {
 				log("Error converting major version:", err)
@@ -218,11 +218,13 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				log("Error converting minor version:", err)
 				return nil, err
 			}
-			revision, err = strconv.Atoi(parts[2])
-			if err != nil {
-				log("Error converting revision version:", err)
-				return nil, err
+			if len(parts) >= 3 {
+				revision, err = strconv.Atoi(parts[2])
+				if err != nil {
+					log("Error converting revision version:", err)
+				}
 			}
+
 			goVersion := ((major & 0xFF) << 16) + ((minor & 0xFF) << 8) + min(revision, 255)
 
 			info := EbpfProcInfo{}
@@ -233,29 +235,49 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 			info.CredentialsSyscallConnItab = uint64(0)
 			info.InstanceId = instanceID
 			info.AppId = appID
-			// go
-			info.MethodPtrPos = uint64(0)
-			info.UrlPtrPos = uint64(16)
-			info.PathPtrPos = uint64(56)
-			info.StatusCodePos = uint64(120)
-			info.RequestHostPos = uint64(128)
-			info.ProtoPos = uint64(24)
-			info.CtxPtrPos = uint64(232)
-			info.HeadersPtrPos = uint64(56)
-			info.BucketsPtrPos = uint64(16)
 			info.CodeType = codeType
+			// go
+			info.BucketsPtrPos = bucketsOff
+			fields := map[*uint64]tracer.ID{
+				&info.MethodPtrPos:   tracer.NewID("std", "net/http", "Request", "Method"),
+				&info.UrlPtrPos:      tracer.NewID("std", "net/http", "Request", "URL"),
+				&info.PathPtrPos:     tracer.NewID("std", "net/url", "URL", "Path"),
+				&info.StatusCodePos:  tracer.NewID("std", "net/http", "response", "status"),
+				&info.RequestHostPos: tracer.NewID("std", "net/http", "Request", "Host"),
+				&info.ProtoPos:       tracer.NewID("std", "net/http", "Request", "Proto"),
+				&info.CtxPtrPos:      tracer.NewID("std", "net/http", "Request", "ctx"),
+				&info.HeadersPtrPos:  tracer.NewID("std", "net/http", "Request", "Header"),
+			}
+
+			for field, id := range fields {
+				off, ok := tracer.GetOffset(id, path)
+				if !ok {
+					klog.Warnf("failed to get offset for ID: %v", id)
+				}
+				*field = off
+			}
+
 			// 获取内存地址
-			allocDetails, err := tracer.Allocate(int(pid))
-			if err == nil && allocDetails != nil {
-				info.StartAddr = allocDetails.StartAddr
-				info.EndAddr = allocDetails.EndAddr
+			if appInfo.GoProcCache.StartAddr == 0 && appInfo.GoProcCache.EndAddr == 0 {
+				allocDetails, allocErr := tracer.Allocate(int(pid))
+				if allocErr != nil {
+					return nil, allocErr
+				}
+				if allocDetails != nil {
+					//info.StartAddr = allocDetails.StartAddr
+					//info.EndAddr = allocDetails.EndAddr
+					appInfo.GoProcCache.StartAddr = allocDetails.StartAddr
+					appInfo.GoProcCache.EndAddr = allocDetails.EndAddr
+				}
 			}
+			info.StartAddr = appInfo.GoProcCache.StartAddr
+			info.EndAddr = appInfo.GoProcCache.EndAddr
 			klog.Debugln("Major:", major)
 			klog.Debugln("Minor:", minor)
 			klog.Debugln("Revision:", revision)
 			klog.Debugln("goVersion", goVersion)
-			klog.Debugln("info.StartAddr", info.StartAddr)
-			klog.Debugln("info.EndAddr", info.EndAddr)
+			klog.WithField("pid", pid).Debugln("info.StartAddr", info.StartAddr)
+			klog.WithField("pid", pid).Debugln("info.EndAddr", info.EndAddr)
 			_, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
 			if err != nil {
 				klog.Error("failed to update program info", err)
@@ -417,42 +439,43 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				links = append(links, l)
 			}
 
-		case goTlsWriteSymbol:
-			klog.Infoln("fucktls goTlsWriteSymbol crypto/tls uprobes attached")
-			l, err := exe.Uprobe(s.Name, t.uprobes["go_crypto_tls_write_enter"], &link.UprobeOptions{Address: address})
-			if err != nil {
-				klog.WithError(err).Errorln("failed to attach write_enter uprobe")
-				return nil, err
-			}
-			links = append(links, l)
-		case goTlsReadSymbol:
-			klog.Infoln("fucktls goTlsReadSymbol crypto/tls uprobes attached")
-			l, err := exe.Uprobe(s.Name, t.uprobes["go_crypto_tls_read_enter"], &link.UprobeOptions{Address: address})
-			if err != nil {
-				klog.WithError(err).Errorln("failed to attach read_enter uprobe")
-				return nil, err
-			}
-			links = append(links, l)
-			sStart := s.Value - textSection.Addr
-			sEnd := sStart + s.Size
-			if sEnd > textSectionLen {
-				continue
-			}
-			sBytes := textSectionData[sStart:sEnd]
-			returnOffsets := getReturnOffsets(ef.Machine, sBytes)
-			if len(returnOffsets) == 0 {
-				err = fmt.Errorf("failed to attach read_exit uprobe no return offsets found")
-				klog.Errorln(err)
-				return nil, err
-			}
-			for _, offset := range returnOffsets {
-				l, err := exe.Uprobe(s.Name, t.uprobes["go_crypto_tls_read_exit"], &link.UprobeOptions{Address: address, Offset: uint64(offset)})
-				if err != nil {
-					klog.WithError(err).Errorln("failed to attach read_exit uprobe")
-					return nil, err
-				}
-				links = append(links, l)
-			}
+			//case goTlsWriteSymbol:
+			//	klog.Infoln("fucktls goTlsWriteSymbol crypto/tls uprobes attached")
+			//	l, err := exe.Uprobe(s.Name, t.uprobes["go_crypto_tls_write_enter"], &link.UprobeOptions{Address: address})
+			//	if err != nil {
+			//		klog.WithError(err).Errorln("failed to attach write_enter uprobe")
+			//		return nil, err
+			//	}
+			//	links = append(links, l)
+
+			//case goTlsReadSymbol:
+			//	klog.Infoln("fucktls goTlsReadSymbol crypto/tls uprobes attached")
+			//	l, err := exe.Uprobe(s.Name, t.uprobes["go_crypto_tls_read_enter"], &link.UprobeOptions{Address: address})
+			//	if err != nil {
+			//		klog.WithError(err).Errorln("failed to attach read_enter uprobe")
+			//		return nil, err
+			//	}
+			//	links = append(links, l)
+			//	sStart := s.Value - textSection.Addr
+			//	sEnd := sStart + s.Size
+			//	if sEnd > textSectionLen {
+			//		continue
+			//	}
+			//	sBytes := textSectionData[sStart:sEnd]
+			//	returnOffsets := getReturnOffsets(ef.Machine, sBytes)
+			//	if len(returnOffsets) == 0 {
+			//		err = fmt.Errorf("failed to attach read_exit uprobe no return offsets found")
+			//		klog.Errorln(err)
+			//		return nil, err
+			//	}
+			//	for _, offset := range returnOffsets {
+			//		l, err := exe.Uprobe(s.Name, t.uprobes["go_crypto_tls_read_exit"], &link.UprobeOptions{Address: address, Offset: uint64(offset)})
+			//		if err != nil {
+			//			klog.WithError(err).Errorln("failed to attach read_exit uprobe")
+			//			return nil, err
+			//		}
+			//		links = append(links, l)
+			//	}
 		}
 	}
 	if len(links) == 0 {

+ 1 - 1
ebpftracer/tracer/allocate.go

@@ -44,7 +44,7 @@ func remoteAllocate(pid int, mapSize uint64) (uint64, error) {
 	}
 
 	defer func() {
-		klog.Info("Detaching from process", "pid", pid)
+		klog.Info("Detaching from process ", pid)
 		err := program.Detach()
 		if err != nil {
 			klog.Error(err, "Failed to detach ptrace", "pid", pid)

+ 4 - 3
ebpftracer/tracer/offset.go

@@ -12,7 +12,6 @@ import (
 	klog "github.com/sirupsen/logrus"
 	"io"
 	"net"
-	"os"
 	"strings"
 )
 
@@ -170,12 +169,13 @@ func GetOffset(id ID, path string) (uint64, bool) {
 
 	elfF, err := elf.Open(path)
 	if err != nil {
-		os.Exit(1)
+		klog.Error(err)
+		return 0, false
 	}
 	defer elfF.Close()
 
 	data, err := elfF.DWARF()
-	fmt.Println(err)
+	//fmt.Println(err)
 	r := data.Reader()
 	if !gotoEntry(r, dwarf.TagStructType, strct) {
 		return 0, false
@@ -183,6 +183,7 @@ func GetOffset(id ID, path string) (uint64, bool) {
 
 	e, err := findEntry(r, dwarf.TagMember, id.Field)
 	if err != nil {
+		klog.Error(err)
 		return 0, false
 	}
 

+ 42 - 0
kube/client.go

@@ -7,9 +7,11 @@ import (
 	"github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils/try"
 	"github.com/sirupsen/logrus"
+	v1 "k8s.io/api/core/v1"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
 	"strings"
+	"time"
 )
 
 var CwK8sClient *transfer.CwK8sClient
@@ -42,6 +44,7 @@ func NewKubeClient() (*transfer.CwK8sClient, error) {
 	try.GoParams(cwk8sclient.InformerSharedfactory.Start, utils.CatchFn, make(chan struct{}, 0))
 	CwK8sClient = cwk8sclient
 	logrus.Info("[kube] connect init success")
+	time.Sleep(1 * time.Second)
 	return cwk8sclient, nil
 }
 
@@ -69,3 +72,42 @@ func GetWorkload(ns, pod string) (string, error) {
 	}
 	return "", fmt.Errorf("workload not found")
 }
+
+func GetNodeIpByCore(nodeName string) (string, error) {
+	client, err := GetKubeClient()
+	if err != nil {
+		return "", err
+	}
+	nodeItem, err := client.InformerSharedfactory.Core().V1().Nodes().Lister().Get(nodeName)
+	if err != nil {
+		logrus.WithError(err).Error("[kube] get node info error")
+		for {
+			time.Sleep(1 * time.Second)
+			nodeItem, err = client.InformerSharedfactory.Core().V1().Nodes().Lister().Get(nodeName)
+			if err != nil {
+				logrus.WithError(err).Error("[kube] get node info error")
+			} else {
+				break
+			}
+		}
+	}
+	// 解析 Node IP
+	var internalIP, externalIP string
+	for _, addr := range nodeItem.Status.Addresses {
+		if addr.Type == v1.NodeInternalIP {
+			internalIP = addr.Address
+		} else if addr.Type == v1.NodeExternalIP {
+			externalIP = addr.Address
+		}
+	}
+
+	if internalIP == "" && externalIP == "" {
+		return "", fmt.Errorf("internalIP or externalIP not found")
+	}
+
+	if internalIP != "" {
+		return internalIP, nil
+	}
+
+	return externalIP, nil
+}

+ 7 - 7
main.go

@@ -157,6 +157,13 @@ func main() {
 
 	log.Infoln("agent version:", version)
 
+	if *flags.RunInContainer {
+		_, err = kube.NewKubeClient()
+		if err != nil {
+			log.WithError(err).Errorf("Failed to init kube client.")
+		}
+	}
+
 	hostname, kv, err := uname()
 	if err != nil {
 		log.Fatalln("failed to get uname:", err)
@@ -184,13 +191,6 @@ func main() {
 	tracing.Init(SystemUUID, hostname, version)
 	logs.Init(SystemUUID, hostname, version)
 
-	if *flags.RunInContainer {
-		_, err = kube.NewKubeClient()
-		if err != nil {
-			log.WithError(err).Errorf("Failed to init kube client.")
-		}
-	}
-
 	registry := prometheus.NewRegistry()
 	registerer := prometheus.WrapRegistererWith(prometheus.Labels{"system_uuid": SystemUUID}, registry)
 

+ 11 - 5
cloudwise-apm-euspace.yaml → manifests/cloudwise-apm-euspace.yaml

@@ -27,7 +27,7 @@ spec:
         kubernetes.io/arch: amd64
       containers:
         - name: cloudwise-apm-euspace
-          image: harbor.cloudwise.com/apm/euspace_dev:1.0
+          image: harbor.cloudwise.com/apm/euspace_dev
           imagePullPolicy: Always
         # imagePullPolicy: IfNotPresent  
           args: ["--listen", "0.0.0.0:8123", "--cgroupfs-root", "/host/sys/fs/cgroup","--run-in-container"]
@@ -61,16 +61,22 @@ spec:
               readOnly: false
               mountPropagation: HostToContainer
           env:
-            - name: SEND
-              value: '1'
-            - name: TRACES_ENDPOINT
-              value: 'http://10.0.16.250:18080/docp/api/v2/data/receive'
+            - name: CONFIG_SERVER
+              value: '10.0.16.250:18080'
+            - name: DATA_SERVER
+              value: '10.0.16.250:18080'
             - name: DISABLE_E2E_TRACING
               value: 'false'
             - name: DISABLE_STACK_TRACING
               value: 'true'
             - name: DISABLE_REG_HOST
               value: 'false'
+            - name: CONSOLE_LOG
+              value: 'true'
+            - name: LOG_LEVEL
+              value: 'debug'
+            - name: SEND
+              value: '1'
       volumes:
         - name: sys-fs-cgroup
           hostPath:

+ 13 - 3
node/apm_host_info.go

@@ -4,6 +4,7 @@ import (
 	"encoding/base64"
 	"fmt"
 	"github.com/coroot/coroot-node-agent/flags"
+	"github.com/coroot/coroot-node-agent/kube"
 	"github.com/coroot/coroot-node-agent/utils"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	klog "github.com/sirupsen/logrus"
@@ -22,10 +23,19 @@ func NewNodeInfo(name, kv, version string) (*NodeInfoT, error) {
 		return ni, err
 	}
 
-	ip, err := utils.GetRealIp()
-	if err != nil {
-		return nil, err
+	var ip string
+	if *flags.RunInContainer {
+		ip, err = kube.GetNodeIpByCore(name)
+		if err != nil {
+			return nil, err
+		}
+	} else {
+		ip, err = utils.GetRealIp()
+		if err != nil {
+			return nil, err
+		}
 	}
+
 	n := &NodeInfoT{
 		Hostname:      name,
 		HostIp:        ip,

+ 1 - 1
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -807,7 +807,7 @@ func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
 	//var descAddr string
 	var method string
 	for _, attr := range event.Attributes {
-		//fmt.Println("HTTP--->", attr.Key, ":", attr.Value.AsInterface())
+		klog.Debugln("HTTP--->", attr.Key, ":", attr.Value.AsInterface())
 		switch attr.Key {
 		case "http.ip":
 			mNode.Ip = attr.Value.AsString()

+ 6 - 0
utils/modelse/app_info.go

@@ -81,6 +81,12 @@ type AppInfo struct {
 	PreStatus      APP_TYPE      `json:"pre_status"`
 	Status         APP_TYPE      `json:"status"`
 	Version        string        `json:"version"`
+	GoProcCache    ProcOnce      `json:"go_proc_cache"`
+}
+
+type ProcOnce struct {
+	StartAddr uint64 `json:"start_addr"`
+	EndAddr   uint64 `json:"end_addr"`
 }
 
 func (a *AppInfo) UpdateAtTime() {