Przeglądaj źródła

Feature #TASK_QT-22513 euspace适配kafka

rock.wu 1 rok temu
rodzic
commit
775fc88136

+ 16 - 1
containers/container.go

@@ -979,7 +979,22 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
 		stats.observe(r.Status.String(), "", r.Duration)
 		query := l7.ParseMongo(r.Payload)
 		trace.MongoQuery(query, r.Status.Error(), r.Duration)
-	case l7.ProtocolKafka, l7.ProtocolCassandra:
+	case l7.ProtocolKafka:
+		stats.observe(r.Status.String(), "", r.Duration)
+		// 获取了kafka的请求类型和topic
+		kafkaInfo, err := l7.ParseKafkaProtocol(r.Payload)
+		if err != nil {
+			return err
+		}
+		if kafkaInfo != nil {
+			if kafkaInfo.RequestType == "kafka_produce" {
+				trace.KafkaProduceRequest(kafkaInfo.Topic, r.Status.Error(), r.Duration)
+			} 
+			// else if kafkaInfo.RequestType == "kafka_consume" {
+			// 	trace.KafkaConsumeRequest(kafkaInfo.Topic, r.Status.Error(), r.Duration)
+			// }
+		}
+	case l7.ProtocolCassandra:
 		stats.observe(r.Status.String(), "", r.Duration)
 	case l7.ProtocolRabbitmq, l7.ProtocolNats:
 		stats.observe(r.Status.String(), r.Method.String(), 0)

+ 23 - 1
containers/container_apm.go

@@ -268,7 +268,29 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		}
 		//query := l7.ParseMongo(r.Payload)
 		//trace.MongoQuery(query, r.Status.Error(), r.Duration)
-	case l7.ProtocolKafka, l7.ProtocolCassandra:
+	case l7.ProtocolKafka:
+		stats.observe(r.Status.String(), "", r.Duration)
+		if c.l7Attach && c.valuableTrace(r.TraceId) {
+			// 获取了kafka的请求类型和topic
+			kafkaInfo, err := l7.ParseKafkaProtocol(r.Payload)
+			if err != nil {
+				return err
+			}
+			if kafkaInfo != nil {
+				apmTrace, err := c.getOrInitTrace(r.TraceId)
+				if err == nil {
+					if kafkaInfo.RequestType == "kafka_produce" {
+						// apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
+						apmTrace.KafkaProduceRequestEvent(kafkaInfo.Topic,conn.ActualDest)
+					} 
+					// else if kafkaInfo.RequestType == "kafka_consume" {
+					// 	trace.KafkaConsumeRequest(kafkaInfo.Topic, r.Status.Error(), r.Duration)
+					// }
+					c.SendEvent(apmTrace, r.TraceId)
+				}
+			}
+		}
+	case l7.ProtocolCassandra:
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
 		}

+ 86 - 0
ebpftracer/l7/kafka.go

@@ -0,0 +1,86 @@
+package l7
+
+import (
+	"encoding/binary"
+)
+
+// KafkaInfo contains parsed Kafka request information
+type KafkaInfo struct {
+	RequestType string
+	Topic      string
+}
+
+// Kafka API Key constants
+const (
+	ProduceRequest = 0
+	FetchRequest   = 1
+)
+
+// ParseKafkaProtocol analyzes Kafka protocol to determine request type and topic
+func ParseKafkaProtocol(payload []byte) (*KafkaInfo, error) {
+	// Kafka request should be at least 8 bytes
+	// 4 bytes for length + 2 bytes for API key + 2 bytes for API version
+	if len(payload) < 8 {
+		return nil, nil
+	}
+
+	// Skip the first 4 bytes (length)
+	// Get API Key from the next 2 bytes
+	apiKey := binary.BigEndian.Uint16(payload[4:6])
+	
+	// Skip API version (2 bytes) and correlation ID (4 bytes)
+	currentOffset := 10
+	
+	// Get client ID length
+	if len(payload) < currentOffset+2 {
+		return nil, nil
+	}
+	clientIDLen := binary.BigEndian.Uint16(payload[currentOffset:currentOffset+2])
+	currentOffset += 2 + int(clientIDLen)
+	
+	// Get topics array length
+	if len(payload) < currentOffset+4 {
+		return nil, nil
+	}
+	
+	// For produce requests, we need to skip the acks (2 bytes) and timeout (4 bytes)
+	if apiKey == ProduceRequest {
+		currentOffset += 6
+	}
+	
+	// Read number of topics
+	if len(payload) < currentOffset+4 {
+		return nil, nil
+	}
+	topicArrayLen := binary.BigEndian.Uint32(payload[currentOffset:currentOffset+4])
+	currentOffset += 4
+	
+	// We only process the first topic for now
+	if topicArrayLen > 0 {
+		if len(payload) < currentOffset+2 {
+			return nil, nil
+		}
+		topicLen := binary.BigEndian.Uint16(payload[currentOffset:currentOffset+2])
+		currentOffset += 2
+		
+		if len(payload) < currentOffset+int(topicLen) {
+			return nil, nil
+		}
+		topic := string(payload[currentOffset:currentOffset+int(topicLen)])
+		
+		info := &KafkaInfo{Topic: topic}
+		
+		switch apiKey {
+		case ProduceRequest:
+			info.RequestType = "kafka_produce"
+		case FetchRequest:
+			info.RequestType = "kafka_consume"
+		default:
+			return nil, nil
+		}
+		
+		return info, nil
+	}
+
+	return nil, nil
+}

+ 2 - 0
ebpftracer/l7/l7.go

@@ -99,6 +99,8 @@ func (p Protocol) ServiceNameString() string {
 		return "DNS"
 	case ProtocolDM:
 		return "DM"
+	case ProtocolKafka:
+		return "KAFKA"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }

+ 30 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -26,6 +26,7 @@ const (
 	NOSQL_SERVICE_TYPE = "NOSQL"
 	HTTP_SERVICE_TYPE  = "HTTP"
 	NET_SERVICE_TYPE   = "L7_NET"
+	MQ_SERVICE_TYPE    = "MQ"
 )
 
 const (
@@ -35,6 +36,7 @@ const (
 	REDIS_SERVICE_NAME      = "REDIS"
 	HTTP_SERVICE_NAME       = "HTTPCLIENT"
 	POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
+	KAFKA_SERVICE_NAME      = "KAFKA"
 )
 
 type apmTraceSpan tracesdk.ReadOnlySpan
@@ -120,6 +122,7 @@ type MapInfoT struct {
 	SpanId          string   `json:"span_id,omitempty"`
 	SrcAddr         string   `json:"src_addr,omitempty"`
 	DestinationAddr string   `json:"destination_addr,omitempty"`
+	Flow          	int64    `json:"flow,omitempty"`	
 }
 
 type TraceMapT struct {
@@ -190,6 +193,8 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 					buildSQLMapEvent(&mNode, event)
 				case l7.ProtocolPostgres:
 					buildSQLMapEvent(&mNode, event)
+				case l7.ProtocolKafka:
+					buildMQMapEvent(&mNode, event)
 				}
 			}
 
@@ -899,6 +904,31 @@ func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 	}
 }
 
+func buildMQMapEvent(mNode *MapInfoT, event tracesdk.Event){
+	mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
+	mNode.ServiceType = SQL_SERVICE_TYPE
+	//mNode.MethodName = "database/sql.Query()"
+	for _, attr := range event.Attributes {
+		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "mq.operatype":
+			mNode.OperType = "produce"
+		case "mq.uri":
+			mNode.Uri = attr.Value.AsString()
+		case "mq.src_addr":
+			mNode.SrcAddr = attr.Value.AsString()
+		case "mq.destination_addr":
+			mNode.DestinationAddr = attr.Value.AsString()
+		}
+	}
+	mNode.Flow = 0
+	mNode.Scheme = ""
+}
+
 func buildDNSMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 	mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
 	mNode.ServiceType = NET_SERVICE_TYPE

+ 21 - 0
tracing/apm_tracing.go

@@ -473,6 +473,27 @@ func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destin
 	t.createTraceEvent(l7.ProtocolRedis.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolRedis.Int(), attr...)
 }
 
+func (t *Trace) KafkaProduceRequestEvent(topic string,destination netaddr.IPPort) {
+	if t == nil {
+		return
+	}
+	t.addEvent()
+	if topic == "" {
+		return
+	}
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+		attribute.String("mq.opertype", "produce"),
+		attribute.String("mq.uri", topic),
+		attribute.String("mq.src_addr", r.ComponentSAddr.String()),
+		attribute.String("mq.destination_addr", r.ComponentDAddr.String()),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolKafka.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolKafka.Int(), attr...)
+}
+
 func (t *Trace) DNSTraceQueryEvent(r *l7.RequestData, _type string, fqdn string, ttl uint32, ips []netaddr.IP) {
 	if t == nil {
 		return