Quellcode durchsuchen

added support for the NATS protocol

Nikolay Sivko vor 2 Jahren
Ursprung
Commit
dcf1c8fc55
7 geänderte Dateien mit 78 neuen und 14 gelöschten Zeilen
  1. 2 0
      containers/app.go
  2. 2 2
      containers/container.go
  3. 2 0
      containers/metrics.go
  4. 0 0
      ebpftracer/ebpf.go
  5. 34 11
      ebpftracer/ebpf/l7/l7.c
  6. 32 0
      ebpftracer/ebpf/l7/nats.c
  7. 6 1
      ebpftracer/tracer.go

+ 2 - 0
containers/app.go

@@ -85,6 +85,8 @@ func guessApplicationType(cmdline []byte) string {
 		return "ceph"
 		return "ceph"
 	case bytes.HasSuffix(cmd, []byte("rook")):
 	case bytes.HasSuffix(cmd, []byte("rook")):
 		return "rook"
 		return "rook"
+	case bytes.HasSuffix(cmd, []byte("nats-server")):
+		return "nats"
 	}
 	}
 	//todo: php-fpm, python, nodejs, java
 	//todo: php-fpm, python, nodejs, java
 	return ""
 	return ""

+ 2 - 2
containers/container.go

@@ -594,7 +594,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpf
 				}
 				}
 				if cOpts.Name != "" {
 				if cOpts.Name != "" {
 					labels := []string{"status"}
 					labels := []string{"status"}
-					if r.Protocol == ebpftracer.L7ProtocolRabbitmq {
+					if r.Protocol == ebpftracer.L7ProtocolRabbitmq || r.Protocol == ebpftracer.L7ProtocolNats {
 						labels = append(labels, "method")
 						labels = append(labels, "method")
 					}
 					}
 					s.Requests = prometheus.NewCounterVec(
 					s.Requests = prometheus.NewCounterVec(
@@ -614,7 +614,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpf
 				stats[key] = s
 				stats[key] = s
 			}
 			}
 			if s.Requests != nil {
 			if s.Requests != nil {
-				if r.Protocol == ebpftracer.L7ProtocolRabbitmq {
+				if r.Protocol == ebpftracer.L7ProtocolRabbitmq || r.Protocol == ebpftracer.L7ProtocolNats {
 					s.Requests.WithLabelValues(r.StatusString(), r.Method.String()).Inc()
 					s.Requests.WithLabelValues(r.StatusString(), r.Method.String()).Inc()
 				} else {
 				} else {
 					s.Requests.WithLabelValues(r.StatusString()).Inc()
 					s.Requests.WithLabelValues(r.StatusString()).Inc()

+ 2 - 0
containers/metrics.go

@@ -99,6 +99,7 @@ var (
 		ebpftracer.L7ProtocolKafka:     {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
 		ebpftracer.L7ProtocolKafka:     {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
 		ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
 		ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
 		ebpftracer.L7ProtocolRabbitmq:  {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
 		ebpftracer.L7ProtocolRabbitmq:  {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
+		ebpftracer.L7ProtocolNats:      {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
 	}
 	}
 	L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
 	L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
 		ebpftracer.L7ProtocolHTTP:      {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
 		ebpftracer.L7ProtocolHTTP:      {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
@@ -110,6 +111,7 @@ var (
 		ebpftracer.L7ProtocolKafka:     {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
 		ebpftracer.L7ProtocolKafka:     {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
 		ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
 		ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
 		ebpftracer.L7ProtocolRabbitmq:  {Name: "", Help: ""},
 		ebpftracer.L7ProtocolRabbitmq:  {Name: "", Help: ""},
+		ebpftracer.L7ProtocolNats:      {Name: "", Help: ""},
 	}
 	}
 )
 )
 
 

Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
ebpftracer/ebpf.go


+ 34 - 11
ebpftracer/ebpf/l7/l7.c

@@ -1,13 +1,3 @@
-#include "http.c"
-#include "postgres.c"
-#include "redis.c"
-#include "memcached.c"
-#include "mysql.c"
-#include "mongo.c"
-#include "kafka.c"
-#include "cassandra.c"
-#include "rabbitmq.c"
-
 #define PROTOCOL_UNKNOWN    0
 #define PROTOCOL_UNKNOWN    0
 #define PROTOCOL_HTTP	    1
 #define PROTOCOL_HTTP	    1
 #define PROTOCOL_POSTGRES	2
 #define PROTOCOL_POSTGRES	2
@@ -18,6 +8,7 @@
 #define PROTOCOL_KAFKA      7
 #define PROTOCOL_KAFKA      7
 #define PROTOCOL_CASSANDRA  8
 #define PROTOCOL_CASSANDRA  8
 #define PROTOCOL_RABBITMQ   9
 #define PROTOCOL_RABBITMQ   9
+#define PROTOCOL_NATS      10
 
 
 #define METHOD_UNKNOWN           0
 #define METHOD_UNKNOWN           0
 #define METHOD_PRODUCE           1
 #define METHOD_PRODUCE           1
@@ -25,6 +16,17 @@
 #define METHOD_STATEMENT_PREPARE 3
 #define METHOD_STATEMENT_PREPARE 3
 #define METHOD_STATEMENT_CLOSE   4
 #define METHOD_STATEMENT_CLOSE   4
 
 
+#include "http.c"
+#include "postgres.c"
+#include "redis.c"
+#include "memcached.c"
+#include "mysql.c"
+#include "mongo.c"
+#include "kafka.c"
+#include "cassandra.c"
+#include "rabbitmq.c"
+#include "nats.c"
+
 #define MAX_PAYLOAD_SIZE 512
 #define MAX_PAYLOAD_SIZE 512
 
 
 struct l7_event {
 struct l7_event {
@@ -198,7 +200,19 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size)
         e->status = 200;
         e->status = 200;
         e->method = METHOD_PRODUCE;
         e->method = METHOD_PRODUCE;
         e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
         e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-        bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
+        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+        return 0;
+    } else if (nats_method(buf, size) == METHOD_PRODUCE) {
+        struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+        if (!e) {
+            return 0;
+        }
+        e->protocol = PROTOCOL_NATS;
+        e->fd = k.fd;
+        e->pid = k.pid;
+        e->status = 200;
+        e->method = METHOD_PRODUCE;
+        e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         return 0;
         return 0;
     } else {
     } else {
@@ -285,6 +299,15 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         return 0;
         return 0;
     }
     }
+    if (nats_method(args->buf, ret) == METHOD_CONSUME) {
+        e->protocol = PROTOCOL_NATS;
+        e->status = 200;
+        e->method = METHOD_CONSUME;
+        e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+        return 0;
+    }
+
 
 
     struct cassandra_header cassandra_response = {};
     struct cassandra_header cassandra_response = {};
     cassandra_response.stream_id = -1;
     cassandra_response.stream_id = -1;

+ 32 - 0
ebpftracer/ebpf/l7/nats.c

@@ -0,0 +1,32 @@
+// https://docs.nats.io/reference/reference-protocols/nats-protocol
+
+static __always_inline
+int nats_method(char *buf, __u64 buf_size) {
+    if (buf_size < 7) {
+        return 0;
+    }
+    char b[5];
+    char end[2];
+    if (bpf_probe_read(&b, sizeof(b), (void *)buf) < 0) {
+        return 0;
+    }
+    if (bpf_probe_read(&end, sizeof(end), (void *)(buf+buf_size-2)) < 0) {
+        return 0;
+    }
+    if (end[0] != '\r' || end[1] != '\n') {
+        return 0;
+    }
+    if (b[0] == 'P' && b[1] == 'U' && b[2] == 'B' && (b[3] == ' ' || b[3] == '\t')) {
+        return METHOD_PRODUCE;
+    }
+    if (b[0] == 'H' && b[1] == 'P' && b[2] == 'U' && b[3] == 'B' && (b[4] == ' ' || b[4] == '\t')) {
+        return METHOD_PRODUCE;
+    }
+    if (b[0] == 'M' && b[1] == 'S' && b[2] == 'G' && (b[3] == ' ' || b[3] == '\t')) {
+        return METHOD_CONSUME;
+    }
+    if (b[0] == 'H' && b[1] == 'M' && b[2] == 'S' && b[3] == 'G' && (b[4] == ' ' || b[4] == '\t')) {
+        return METHOD_CONSUME;
+    }
+    return 0;
+}

+ 6 - 1
ebpftracer/tracer.go

@@ -54,6 +54,7 @@ const (
 	L7ProtocolKafka     L7Protocol = 7
 	L7ProtocolKafka     L7Protocol = 7
 	L7ProtocolCassandra L7Protocol = 8
 	L7ProtocolCassandra L7Protocol = 8
 	L7ProtocolRabbitmq  L7Protocol = 9
 	L7ProtocolRabbitmq  L7Protocol = 9
+	L7ProtocolNats      L7Protocol = 10
 )
 )
 
 
 func (p L7Protocol) String() string {
 func (p L7Protocol) String() string {
@@ -74,6 +75,10 @@ func (p L7Protocol) String() string {
 		return "Kafka"
 		return "Kafka"
 	case L7ProtocolCassandra:
 	case L7ProtocolCassandra:
 		return "Cassandra"
 		return "Cassandra"
+	case L7ProtocolRabbitmq:
+		return "Rabbitmq"
+	case L7ProtocolNats:
+		return "NATS"
 	}
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }
 }
@@ -113,7 +118,7 @@ func (r *L7Request) StatusString() string {
 	switch r.Protocol {
 	switch r.Protocol {
 	case L7ProtocolHTTP:
 	case L7ProtocolHTTP:
 		return strconv.Itoa(r.Status)
 		return strconv.Itoa(r.Status)
-	case L7ProtocolMongo, L7ProtocolKafka, L7ProtocolRabbitmq:
+	case L7ProtocolMongo, L7ProtocolKafka, L7ProtocolRabbitmq, L7ProtocolNats:
 		return "unknown"
 		return "unknown"
 	}
 	}
 	if r.Status == 500 {
 	if r.Status == 500 {

Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.