Ver código fonte

eBPF tracer: kafka protocol support

Nikolay Sivko 3 anos atrás
pai
commit
99c3b381bc

+ 2 - 0
containers/container.go

@@ -501,6 +501,8 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpf
 			switch r.Protocol {
 			case ebpftracer.L7ProtocolHTTP:
 				status = strconv.Itoa(r.Status)
+			case ebpftracer.L7ProtocolMongo, ebpftracer.L7ProtocolKafka:
+				status = "unknown"
 			default:
 				if r.Status == 500 {
 					status = "failed"

+ 2 - 0
containers/metrics.go

@@ -80,6 +80,7 @@ var (
 		ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
 		ebpftracer.L7ProtocolMysql:     {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
 		ebpftracer.L7ProtocolMongo:     {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
+		ebpftracer.L7ProtocolKafka:     {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
 	}
 	L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
 		ebpftracer.L7ProtocolHTTP:      {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
@@ -88,6 +89,7 @@ var (
 		ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
 		ebpftracer.L7ProtocolMysql:     {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
 		ebpftracer.L7ProtocolMongo:     {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
+		ebpftracer.L7ProtocolKafka:     {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
 	}
 )
 

Diferenças do arquivo suprimidas por serem muito extensas
+ 0 - 0
ebpftracer/ebpf.go


+ 42 - 0
ebpftracer/ebpf/l7/kafka.c

@@ -0,0 +1,42 @@
+// https://kafka.apache.org/protocol.html
+
+struct kafka_request_header {
+    __s32 length;
+    __s16 api_key;
+    __s16 api_version;
+    __s32 correlation_id;
+};
+
+struct kafka_response_header {
+    __s32 length;
+    __s32 correlation_id;
+};
+
+static __always_inline
+__s32 is_kafka_request(char *buf, int buf_size) {
+    if (buf_size < 1) {
+        return 0;
+    }
+    struct kafka_request_header h = {};
+    if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (h.correlation_id > 0 && (h.api_key >= 0 && h.api_key <= 67)) {
+        return h.correlation_id;
+    }
+    return 0;
+}
+
+static __always_inline
+__u32 parse_kafka_status(__s32 request_id, char *buf, int buf_size, __u8 partial) {
+    struct kafka_response_header h = {};
+    if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (h.correlation_id == request_id) {
+        return 200;
+    }
+    return 0;
+}
+
+

+ 33 - 11
ebpftracer/ebpf/l7/l7.c

@@ -4,6 +4,8 @@
 #include "memcached.c"
 #include "mysql.c"
 #include "mongo.c"
+#include "kafka.c"
+
 
 #define PROTOCOL_UNKNOWN    0
 #define PROTOCOL_HTTP	    1
@@ -12,6 +14,7 @@
 #define PROTOCOL_MEMCACHED  4
 #define PROTOCOL_MYSQL      5
 #define PROTOCOL_MONGO      6
+#define PROTOCOL_KAFKA      7
 
 struct l7_event {
     __u64 fd;
@@ -49,6 +52,7 @@ struct l7_request {
     __u64 ns;
     __u8 protocol;
     __u8 partial;
+    __s32 request_id;
 };
 
 struct {
@@ -81,7 +85,13 @@ static inline __attribute__((__always_inline__))
 int trace_enter_write(__u64 fd, char *buf, __u64 size) {
     __u64 id = bpf_get_current_pid_tgid();
     struct l7_request req = {};
+    req.protocol = PROTOCOL_UNKNOWN;
     req.partial = 0;
+    req.request_id = 0;
+    req.ns = 0;
+    struct socket_key k = {};
+    k.pid = id >> 32;
+    k.fd = fd;
     if (is_http_request(buf)) {
         req.protocol = PROTOCOL_HTTP;
     } else if (is_postgres_query(buf, size)) {
@@ -95,12 +105,22 @@ int trace_enter_write(__u64 fd, char *buf, __u64 size) {
     } else if (is_mongo_query(buf, size)) {
         req.protocol = PROTOCOL_MONGO;
     } else {
+        __s32 request_id = is_kafka_request(buf, size);
+        if  (request_id > 0) {
+            req.request_id = request_id;
+            req.protocol = PROTOCOL_KAFKA;
+            struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
+            if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
+                req.ns = prev_req->ns;
+            }
+        }
+    }
+    if (req.protocol == PROTOCOL_UNKNOWN) {
         return 0;
     }
-    req.ns = bpf_ktime_get_ns();
-    struct socket_key k = {};
-    k.pid = id >> 32;
-    k.fd = fd;
+    if (req.ns == 0) {
+        req.ns = bpf_ktime_get_ns();
+    }
     bpf_map_update_elem(&active_l7_requests, &k, &req, BPF_ANY);
     return 0;
 }
@@ -138,6 +158,7 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
     if (!req) {
         return 0;
     }
+    __s32 request_id = req->request_id;
     struct l7_event e = {};
     e.protocol = req->protocol;
     e.fd = k.fd;
@@ -146,18 +167,17 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
     __u64 ns = req->ns;
     __u8 partial = req->partial;
     bpf_map_delete_elem(&active_l7_requests, &k);
-
-    if (req->protocol == PROTOCOL_HTTP) {
+    if (e.protocol == PROTOCOL_HTTP) {
         e.status = parse_http_status(buf);
-    } else if (req->protocol == PROTOCOL_POSTGRES) {
+    } else if (e.protocol == PROTOCOL_POSTGRES) {
         e.status = parse_postgres_status(buf, ctx->ret);
-    } else if (req->protocol == PROTOCOL_REDIS) {
+    } else if (e.protocol == PROTOCOL_REDIS) {
         e.status = parse_redis_status(buf, ctx->ret);
-    } else if (req->protocol == PROTOCOL_MEMCACHED) {
+    } else if (e.protocol == PROTOCOL_MEMCACHED) {
         e.status = parse_memcached_status(buf, ctx->ret);
-    } else if (req->protocol == PROTOCOL_MYSQL) {
+    } else if (e.protocol == PROTOCOL_MYSQL) {
         e.status = parse_mysql_status(buf, ctx->ret);
-    } else if (req->protocol == PROTOCOL_MONGO) {
+    } else if (e.protocol == PROTOCOL_MONGO) {
         e.status = parse_mongo_status(buf, ctx->ret, partial);
         if (e.status == 1) {
             struct l7_request r = {};
@@ -167,6 +187,8 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
             bpf_map_update_elem(&active_l7_requests, &k, &r, BPF_ANY);
             return 0;
         }
+    } else if (e.protocol == PROTOCOL_KAFKA) {
+        e.status = parse_kafka_status(request_id, buf, ctx->ret, partial);
     }
     if (e.status == 0) {
         return 0;

+ 3 - 0
ebpftracer/tracer.go

@@ -48,6 +48,7 @@ const (
 	L7ProtocolMemcached L7Protocol = 4
 	L7ProtocolMysql     L7Protocol = 5
 	L7ProtocolMongo     L7Protocol = 6
+	L7ProtocolKafka     L7Protocol = 7
 )
 
 func (p L7Protocol) String() string {
@@ -64,6 +65,8 @@ func (p L7Protocol) String() string {
 		return "Mysql"
 	case L7ProtocolMongo:
 		return "Mongo"
+	case L7ProtocolKafka:
+		return "Kafka"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }

Alguns arquivos não foram mostrados porque muitos arquivos mudaram nesse diff