소스 검색

Feature #TASK_QT-18250 poc xlsmart

Carl 9 달 전
부모
커밋
ddedb5e87f

+ 11 - 2
containers/container_apm.go

@@ -283,9 +283,18 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	case l7.ProtocolMongo:
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
+			query := l7.ParseMongo(r.Payload)
+			if c.AppInfo.AppName != "" {
+				klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
+			}
+
+			apmTrace, err := c.getOrInitTrace(r.TraceId)
+			if err == nil {
+				//apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
+				apmTrace.MongoTraceQueryEvent(query, r, conn.ActualDest)
+				c.SendEvent(apmTrace, r.TraceId)
+			}
 		}
-		//query := l7.ParseMongo(r.Payload)
-		//trace.MongoQuery(query, r.Status.Error(), r.Duration)
 	case l7.ProtocolKafka, l7.ProtocolCassandra:
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {

+ 0 - 2
ebpftracer/ebpf/l7/mongo.c

@@ -12,7 +12,6 @@ struct mongo_header {
 
 static __always_inline
 int is_mongo_query(char *buf, __u64 buf_size) {
-	return 0;
     struct mongo_header h = {};
     if (buf_size < sizeof(h)) {
         return 0;
@@ -26,7 +25,6 @@ int is_mongo_query(char *buf, __u64 buf_size) {
 
 static __always_inline
 int is_mongo_response(char *buf, __u64 buf_size, __u8 partial) {
-	return 0;
     if (partial == 0 && buf_size == 4) { //partial read
         return 2;
     }

+ 41 - 21
ebpftracer/ebpf/utrace/go/include/go_types.h

@@ -23,24 +23,24 @@
  Keep a power of 2 to help with masks */
 #define MAX_SLICE_ARRAY_SIZE 1024
 
-struct go_string_ot
+typedef struct go_string_ot
 {
-    char *str;
-    s64 len;
-};
+	char *str;
+	s64 len;
+} go_string_ot;
 
-struct go_slice_ot
+typedef struct go_slice_ot
 {
-    void *array;
-    s64 len;
-    s64 cap;
-};
+	void *array;
+	s64 len;
+	s64 cap;
+} go_slice_ot;
 
-struct go_iface
+typedef struct go_iface
 {
-    void *tab;
-    void *data;
-};
+	void *type;
+	void *data;
+} go_iface_t;
 
 struct map_bucket {
     char tophash[8];
@@ -55,20 +55,40 @@ struct map_bucket {
 	u8 bucket_index;
 
 };
-
-struct slice_array_buff
-{
-    unsigned char buff[MAX_SLICE_ARRAY_SIZE];
+// a map bucket type with the given key and value types
+#define MAP_BUCKET_TYPE(key_type, value_type) struct map_bucket_##key_type##_##value_type##_t
+// a map bucket struct definition with the given key and value types
+// for more details about the structure of a map bucket see:
+// https://github.com/golang/go/blob/639cc0dcc0948dd02c9d5fc12fbed730a21ebebc/src/runtime/map.go#L143
+#define MAP_BUCKET_DEFINITION(key_type, value_type) \
+MAP_BUCKET_TYPE(key_type, value_type) { \
+    char tophash[8]; \
+    key_type keys[8]; \
+    value_type values[8]; \
+    void *overflow; \
 };
 
 struct
 {
-    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-    __type(key, u32);
-    __type(value, struct slice_array_buff);
-    __uint(max_entries, 1);
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__type(key, u32);
+	__type(value, struct slice_array_buff);
+	__uint(max_entries, 1);
 } slice_array_buff_map SEC(".maps");
 
+struct slice_array_buff
+{
+    unsigned char buff[MAX_SLICE_ARRAY_SIZE];
+};
+//
+//struct
+//{
+//    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+//    __type(key, u32);
+//    __type(value, struct slice_array_buff);
+//    __uint(max_entries, 1);
+//} slice_array_buff_map SEC(".maps");
+
 // In Go, interfaces are represented as a pair of pointers: a pointer to the
 // interface data, and a pointer to the interface table.
 // See: runtime.iface in https://golang.org/src/runtime/runtime2.go

+ 111 - 5
ebpftracer/ebpf/utrace/go/net/server.probe.bpf.c

@@ -301,6 +301,112 @@ static __always_inline struct map_bucket *get_map_bucket(void *headers_ptr_ptr)
 	return NULL;
 }
 
+
+MAP_BUCKET_DEFINITION(go_string_ot,  go_slice_ot)
+
+static __always_inline char *extract_context_from_req_headers_go_map(void *headers_ptr_ptr)
+{
+	void *headers_ptr;
+	long res;
+	res = bpf_probe_read(&headers_ptr, sizeof(headers_ptr), headers_ptr_ptr);
+	if (res < 0)
+	{
+		return NULL;
+	}
+	u64 headers_count = 0;
+	res = bpf_probe_read(&headers_count, sizeof(headers_count), headers_ptr);
+	if (res < 0)
+	{
+		return NULL;
+	}
+	if (headers_count == 0)
+	{
+		return NULL;
+	}
+	unsigned char log_2_bucket_count;
+	res = bpf_probe_read(&log_2_bucket_count, sizeof(log_2_bucket_count), headers_ptr + 9);
+	if (res < 0)
+	{
+		return NULL;
+	}
+	u64 bucket_count = 1 << log_2_bucket_count;
+	void *header_buckets;
+	/**/
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info =
+			bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if(!proc_info)
+	{
+		return NULL;
+	}
+	/**/
+
+	res = bpf_probe_read(&header_buckets, sizeof(header_buckets), (void*)(headers_ptr + proc_info->buckets_ptr_pos));
+	if (res < 0)
+	{
+		return NULL;
+	}
+	u32 map_id = 0;
+	MAP_BUCKET_TYPE(go_string_ot, go_slice_ot) *map_value = bpf_map_lookup_elem(&golang_mapbucket_storage_map, &map_id);
+	if (!map_value)
+	{
+		return NULL;
+	}
+
+	for (u64 j = 0; j < MAX_BUCKETS; j++)
+	{
+		if (j >= bucket_count)
+		{
+			break;
+		}
+		res = bpf_probe_read(map_value, sizeof(MAP_BUCKET_TYPE(go_string_ot, go_slice_ot)), header_buckets + (j * sizeof(MAP_BUCKET_TYPE(go_string_ot, go_slice_ot))));
+		if (res < 0)
+		{
+			continue;
+		}
+		for (u64 i = 0; i < 8; i++)
+		{
+			if (map_value->tophash[i] == 0)
+			{
+				continue;
+			}
+			if (map_value->keys[i].len != CW_HEADER_KEY_LENGTH)
+			{
+				continue;
+			}
+			char current_header_key[CW_HEADER_KEY_LENGTH];
+			bpf_probe_read(current_header_key, sizeof(current_header_key), map_value->keys[i].str);
+			if (!bpf_memcmp(current_header_key, CW_HEADER_KEY_VAL, CW_HEADER_KEY_LENGTH) && !bpf_memcmp(current_header_key, CW_HEADER_KEY_UFIRST_VAL, CW_HEADER_KEY_LENGTH))
+			{
+				continue;
+			}
+			void *traceparent_header_value_ptr = map_value->values[i].array;
+			struct go_string_ot traceparent_header_value_go_str;
+			res = bpf_probe_read(&traceparent_header_value_go_str, sizeof(traceparent_header_value_go_str), traceparent_header_value_ptr);
+			if (res < 0)
+			{
+				return NULL;
+			}
+			if (traceparent_header_value_go_str.len != CW_HEADER_VAL_LENGTH)
+			{
+				continue;
+			}
+			char traceparent_header_value2[CW_HEADER_VAL_LENGTH];
+			res = bpf_probe_read(&traceparent_header_value2, sizeof(traceparent_header_value2), traceparent_header_value_go_str.str);
+			if (res < 0)
+			{
+				return NULL;
+			}
+			bpf_printk("has cw header111 j=%d i=%d %s", j, i,traceparent_header_value_go_str.str);
+
+			return traceparent_header_value_go_str.str;
+//			w3c_string_to_span_context(traceparent_header_value, parent_span_context);
+//			return 0;
+		}
+	}
+	return NULL;
+}
 // 获取 header_val
 static __always_inline char *get_header_val(struct map_bucket *map_value,u32 off,u32 count) {
 	for (u32 i = off; i < count; i++) {
@@ -387,13 +493,13 @@ int uprobe_HandlerFunc_ServeHTTP(struct pt_regs *ctx) {
 
 	// Propagate context
 	void *req_ptr = get_argument(ctx, 4);
-	struct map_bucket * map_bucket_p = get_map_bucket((void *) (req_ptr + proc_info->headers_ptr_pos));
+//	struct map_bucket * map_bucket_p = get_map_bucket((void *) (req_ptr + proc_info->headers_ptr_pos));
 
-	if (map_bucket_p == NULL) {
-		return 0;
-	}
+	char *traceparent_header_value = extract_context_from_req_headers_go_map(
+			(void *) (req_ptr + proc_info->headers_ptr_pos));
+//	bpf_printk("111 %s",traceparent_header_value);
 
-	char * traceparent_header_value = get_header_val_off(map_bucket_p);
+//	char * traceparent_header_value = get_header_val_off(map_bucket_p);
 	struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&cw_parent_span_context_storage_map, &map_id);
 	if (!cw_parent_span_context) {
 		return 0;

+ 2 - 1
ebpftracer/tracer.go

@@ -234,7 +234,8 @@ func (t *Tracer) init(ch chan<- Event) error {
 		typ := EventTypeConnectionOpen
 		if s.Listen {
 			typ = EventTypeListenOpen
-		} else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
+			//} else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
+		} else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] { // 存在误判
 			continue
 		}
 		ch <- Event{

+ 24 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -32,6 +32,7 @@ const (
 	MYSQL_SERVICE_NAME      = "MYSQL"
 	DM_SERVICE_NAME         = "DM"
 	REDIS_SERVICE_NAME      = "REDIS"
+	MONGO_SERVICE_NAME      = "MONGODB"
 	HTTP_SERVICE_NAME       = "HTTPCLIENT"
 	POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
 )
@@ -184,6 +185,8 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 					buildSQLMapEvent(&mNode, event)
 				case l7.ProtocolRedis:
 					buildRedisMapEvent(&mNode, event)
+				case l7.ProtocolMongo:
+					buildMongoMapEvent(&mNode, event)
 				// dm
 				case l7.ProtocolDM:
 					buildSQLMapEvent(&mNode, event)
@@ -1050,6 +1053,27 @@ func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 	}
 }
 
+func buildMongoMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.ServiceName = MONGO_SERVICE_NAME
+	mNode.ServiceType = NOSQL_SERVICE_TYPE
+	for _, attr := range event.Attributes {
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "db.statement":
+			query := attr.Value.AsString()
+			mNode.MethodName = query
+			mNode.Ps = []string{query}
+		case "nosql.src_addr":
+			mNode.SrcAddr = attr.Value.AsString()
+		case "nosql.destination_addr":
+			mNode.DestinationAddr = attr.Value.AsString()
+		}
+	}
+}
+
 func isEnter(_type string) bool {
 	if _type == "APPLICATION" {
 		return true

+ 22 - 0
tracing/apm_tracing.go

@@ -522,6 +522,28 @@ func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destin
 	t.createTraceEvent(l7.ProtocolRedis.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolRedis.Int(), attr...)
 }
 
+func (t *Trace) MongoTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
+	if t == nil {
+		return
+	}
+	t.addEvent()
+	if query == "" {
+		return
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.DBSystemRedis,
+		semconv.DBStatement(query),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+		attribute.String("nosql.src_addr", r.ComponentSAddr.String()),
+		attribute.String("nosql.destination_addr", r.ComponentDAddr.String()),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolMongo.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolMongo.Int(), attr...)
+}
+
 func (t *Trace) DNSTraceQueryEvent(r *l7.RequestData, _type string, fqdn string, ttl uint32, ips []netaddr.IP) {
 	if t == nil {
 		return