Browse Source

eBPF-based tracing

Nikolay Sivko 3 years ago
parent
commit
b0eaa0e3df

+ 15 - 5
containers/container.go

@@ -9,6 +9,7 @@ import (
 	"github.com/coroot/coroot-node-agent/node"
 	"github.com/coroot/coroot-node-agent/pinger"
 	"github.com/coroot/coroot-node-agent/proc"
+	"github.com/coroot/coroot-node-agent/tracing"
 	"github.com/coroot/logparser"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/vishvananda/netns"
@@ -65,6 +66,8 @@ type ActiveConnection struct {
 	Fd         uint64
 	Timestamp  uint64
 	Closed     time.Time
+
+	PreparedStatements map[string]string
 }
 
 type L7Stats struct {
@@ -73,6 +76,7 @@ type L7Stats struct {
 }
 
 type Container struct {
+	id       ContainerID
 	cgroup   *cgroup.Cgroup
 	metadata *ContainerMetadata
 
@@ -113,13 +117,14 @@ type Container struct {
 	done chan struct{}
 }
 
-func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
+func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
 	netNs, err := proc.GetNetNs(pid)
 	if err != nil {
 		return nil, err
 	}
 	defer netNs.Close()
 	c := &Container{
+		id:       id,
 		cgroup:   cg,
 		metadata: md,
 
@@ -458,10 +463,11 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 		}
 		c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
 		c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
-			ActualDest: *actualDst,
-			Pid:        pid,
-			Fd:         fd,
-			Timestamp:  timestamp,
+			ActualDest:         *actualDst,
+			Pid:                pid,
+			Fd:                 fd,
+			Timestamp:          timestamp,
+			PreparedStatements: map[string]string{},
 		}
 	}
 	c.connectLastAttempt[dst] = time.Now()
@@ -514,6 +520,10 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpf
 				stats = map[AddrPair]*L7Stats{}
 				c.l7Stats[r.Protocol] = stats
 			}
+			tracing.HandleL7Request(string(c.id), conn.ActualDest, r, conn.PreparedStatements)
+			if r.Method == ebpftracer.L7MethodStatementClose {
+				return
+			}
 			s := stats[key]
 			if s == nil {
 				constLabels := map[string]string{"destination": key.src.String(), "actual_destination": key.dst.String()}

+ 1 - 1
containers/registry.go

@@ -284,7 +284,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		r.containersByCgroupId[cg.Id] = c
 		return c
 	}
-	c, err := NewContainer(cg, md, r.hostConntrack, pid)
+	c, err := NewContainer(id, cg, md, r.hostConntrack, pid)
 	if err != nil {
 		klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
 		return nil

File diff suppressed because it is too large
+ 0 - 0
ebpftracer/ebpf.go


+ 151 - 72
ebpftracer/ebpf/l7/l7.c

@@ -8,7 +8,6 @@
 #include "cassandra.c"
 #include "rabbitmq.c"
 
-
 #define PROTOCOL_UNKNOWN    0
 #define PROTOCOL_HTTP	    1
 #define PROTOCOL_POSTGRES	2
@@ -20,9 +19,13 @@
 #define PROTOCOL_CASSANDRA  8
 #define PROTOCOL_RABBITMQ   9
 
-#define METHOD_UNKNOWN 0
-#define METHOD_PRODUCE 1
-#define METHOD_CONSUME 2
+#define METHOD_UNKNOWN           0
+#define METHOD_PRODUCE           1
+#define METHOD_CONSUME           2
+#define METHOD_STATEMENT_PREPARE 3
+#define METHOD_STATEMENT_CLOSE   4
+
+#define MAX_PAYLOAD_SIZE 512
 
 struct l7_event {
     __u64 fd;
@@ -32,8 +35,18 @@ struct l7_event {
     __u64 duration;
     __u8 protocol;
     __u8 method;
+    __u16 padding;
+    __u32 statement_id;
+    char payload[MAX_PAYLOAD_SIZE];
 };
 
+struct {
+     __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+     __type(key, __u32);
+     __type(value, struct l7_event);
+     __uint(max_entries, 1);
+} l7_event_heap SEC(".maps");
+
 struct {
     __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
     __uint(key_size, sizeof(int));
@@ -63,9 +76,18 @@ struct l7_request {
     __u64 ns;
     __u8 protocol;
     __u8 partial;
+    __u8 request_type;
     __s32 request_id;
+    char payload[MAX_PAYLOAD_SIZE];
 };
 
+struct {
+     __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+     __type(key, __u32);
+     __type(value, struct l7_request);
+     __uint(max_entries, 1);
+} l7_request_heap SEC(".maps");
+
 struct {
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(key_size, sizeof(struct socket_key));
@@ -107,61 +129,101 @@ __u64 get_connection_timestamp(__u32 pid, __u64 fd) {
 static inline __attribute__((__always_inline__))
 int trace_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx, __u64 fd, char *buf, __u64 size) {
     __u64 id = bpf_get_current_pid_tgid();
-    struct l7_request req = {};
-    req.protocol = PROTOCOL_UNKNOWN;
-    req.partial = 0;
-    req.request_id = 0;
-    req.ns = 0;
+    int zero = 0;
+    struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
+    if (!req) {
+        return 0;
+    }
+    req->protocol = PROTOCOL_UNKNOWN;
+    req->partial = 0;
+    req->request_id = 0;
+    req->ns = 0;
     struct socket_key k = {};
     k.pid = id >> 32;
     k.fd = fd;
     k.stream_id = -1;
+
     if (is_http_request(buf)) {
-        req.protocol = PROTOCOL_HTTP;
-    } else if (is_postgres_query(buf, size)) {
-        req.protocol = PROTOCOL_POSTGRES;
+        req->protocol = PROTOCOL_HTTP;
+    } else if (is_postgres_query(buf, size, &req->request_type)) {
+        if (req->request_type == POSTGRES_FRAME_CLOSE) {
+            struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+            if (!e) {
+                return 0;
+            }
+            e->protocol = PROTOCOL_POSTGRES;
+            e->fd = k.fd;
+            e->pid = k.pid;
+            e->method = METHOD_STATEMENT_CLOSE;
+            e->status = 200;
+            e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+            bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
+            bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+            return 0;
+        }
+        req->protocol = PROTOCOL_POSTGRES;
     } else if (is_redis_query(buf)) {
-        req.protocol = PROTOCOL_REDIS;
+        req->protocol = PROTOCOL_REDIS;
     } else if (is_memcached_query(buf, size)) {
-        req.protocol = PROTOCOL_MEMCACHED;
-    } else if (is_mysql_query(buf, size)) {
-        req.protocol = PROTOCOL_MYSQL;
+        req->protocol = PROTOCOL_MEMCACHED;
+    } else if (is_mysql_query(buf, size, &req->request_type)) {
+        if (req->request_type == MYSQL_COM_STMT_CLOSE) {
+            struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+            if (!e) {
+                return 0;
+            }
+            e->protocol = PROTOCOL_MYSQL;
+            e->fd = k.fd;
+            e->pid = k.pid;
+            e->method = METHOD_STATEMENT_CLOSE;
+            e->status = 200;
+            e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+            bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
+            bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+            return 0;
+        }
+        req->protocol = PROTOCOL_MYSQL;
     } else if (is_mongo_query(buf, size)) {
-        req.protocol = PROTOCOL_MONGO;
+        req->protocol = PROTOCOL_MONGO;
     } else if (is_rabbitmq_produce(buf, size)) {
-        struct l7_event e = {};
-        e.protocol = PROTOCOL_RABBITMQ;
-        e.fd = k.fd;
-        e.pid = k.pid;
-        e.status = 200;
-        e.method = METHOD_PRODUCE;
-        e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+        struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+        if (!e) {
+            return 0;
+        }
+        e->protocol = PROTOCOL_RABBITMQ;
+        e->fd = k.fd;
+        e->pid = k.pid;
+        e->status = 200;
+        e->method = METHOD_PRODUCE;
+        e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+        bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
+        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         return 0;
     } else {
         __s32 request_id = is_kafka_request(buf, size);
         if  (request_id > 0) {
-            req.request_id = request_id;
-            req.protocol = PROTOCOL_KAFKA;
+            req->request_id = request_id;
+            req->protocol = PROTOCOL_KAFKA;
             struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
             if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
-                req.ns = prev_req->ns;
+                req->ns = prev_req->ns;
             }
         } else {
             __s16 stream_id = is_cassandra_request(buf, size);
             if  (stream_id != -1) {
                 k.stream_id = stream_id;
-                req.protocol = PROTOCOL_CASSANDRA;
+                req->protocol = PROTOCOL_CASSANDRA;
             }
         }
     }
-    if (req.protocol == PROTOCOL_UNKNOWN) {
+    if (req->protocol == PROTOCOL_UNKNOWN) {
         return 0;
     }
-    if (req.ns == 0) {
-        req.ns = bpf_ktime_get_ns();
+    if (req->ns == 0) {
+        req->ns = bpf_ktime_get_ns();
     }
-    bpf_map_update_elem(&active_l7_requests, &k, &req, BPF_ANY);
+    bpf_probe_read(req->payload, MAX_PAYLOAD_SIZE, (void *)buf);
+    bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
     return 0;
 }
 
@@ -179,7 +241,7 @@ int trace_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
 static inline __attribute__((__always_inline__))
 int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-
+    int zero = 0;
     struct rw_args_t *args = bpf_map_lookup_elem(&active_reads, &id);
     if (!args) {
         return 0;
@@ -197,19 +259,23 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
         return 0;
     }
 
-    struct l7_event e = {};
-    e.fd = k.fd;
-    e.pid = k.pid;
-    e.connection_timestamp = 0;
-    e.status = 0;
-    e.method = METHOD_UNKNOWN;
+    struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    if (!e) {
+        return 0;
+    }
+    e->fd = k.fd;
+    e->pid = k.pid;
+    e->connection_timestamp = 0;
+    e->status = 0;
+    e->method = METHOD_UNKNOWN;
+    e->statement_id = 0;
 
     if (is_rabbitmq_consume(buf, size)) {
-        e.protocol = PROTOCOL_RABBITMQ;
-        e.status = 200;
-        e.method = METHOD_CONSUME;
-        e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+        e->protocol = PROTOCOL_RABBITMQ;
+        e->status = 200;
+        e->method = METHOD_CONSUME;
+        e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         return 0;
     }
 
@@ -226,42 +292,55 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
             return 0;
         }
     }
+
+    bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, req->payload);
     __s32 request_id = req->request_id;
-    e.protocol = req->protocol;
+    e->protocol = req->protocol;
     __u64 ns = req->ns;
     __u8 partial = req->partial;
+    __u8 request_type = req->request_type;
     bpf_map_delete_elem(&active_l7_requests, &k);
-    if (e.protocol == PROTOCOL_HTTP) {
-        e.status = parse_http_status(buf);
-    } else if (e.protocol == PROTOCOL_POSTGRES) {
-        e.status = parse_postgres_status(buf, ctx->ret);
-    } else if (e.protocol == PROTOCOL_REDIS) {
-        e.status = parse_redis_status(buf, ctx->ret);
-    } else if (e.protocol == PROTOCOL_MEMCACHED) {
-        e.status = parse_memcached_status(buf, ctx->ret);
-    } else if (e.protocol == PROTOCOL_MYSQL) {
-        e.status = parse_mysql_status(buf, ctx->ret);
-    } else if (e.protocol == PROTOCOL_MONGO) {
-        e.status = parse_mongo_status(buf, ctx->ret, partial);
-        if (e.status == 1) {
-            struct l7_request r = {};
-            r.partial = 1;
-            r.protocol = e.protocol;
-            r.ns = ns;
-            bpf_map_update_elem(&active_l7_requests, &k, &r, BPF_ANY);
+    if (e->protocol == PROTOCOL_HTTP) {
+        e->status = parse_http_status(buf);
+    } else if (e->protocol == PROTOCOL_POSTGRES) {
+        e->status = parse_postgres_status(buf, ctx->ret);
+        if (request_type == POSTGRES_FRAME_PARSE) {
+            e->method = METHOD_STATEMENT_PREPARE;
+        }
+    } else if (e->protocol == PROTOCOL_REDIS) {
+        e->status = parse_redis_status(buf, ctx->ret);
+    } else if (e->protocol == PROTOCOL_MEMCACHED) {
+        e->status = parse_memcached_status(buf, ctx->ret);
+    } else if (e->protocol == PROTOCOL_MYSQL) {
+        e->status = parse_mysql_response(buf, ctx->ret, request_type, &e->statement_id);
+        if (request_type == MYSQL_COM_STMT_PREPARE) {
+            e->method = METHOD_STATEMENT_PREPARE;
+        }
+    } else if (e->protocol == PROTOCOL_MONGO) {
+        e->status = parse_mongo_status(buf, ctx->ret, partial);
+        if (e->status == 1) {
+            struct l7_request *r = bpf_map_lookup_elem(&l7_request_heap, &zero);
+            if (!r) {
+                return 0;
+            }
+            r->partial = 1;
+            r->protocol = e->protocol;
+            r->ns = ns;
+            bpf_probe_read(r->payload, MAX_PAYLOAD_SIZE, e->payload);
+            bpf_map_update_elem(&active_l7_requests, &k, r, BPF_ANY);
             return 0;
         }
-    } else if (e.protocol == PROTOCOL_KAFKA) {
-        e.status = parse_kafka_status(request_id, buf, ctx->ret, partial);
-    } else if (e.protocol == PROTOCOL_CASSANDRA) {
-        e.status = cassandra_status(cassandra_response);
+    } else if (e->protocol == PROTOCOL_KAFKA) {
+        e->status = parse_kafka_status(request_id, buf, ctx->ret, partial);
+    } else if (e->protocol == PROTOCOL_CASSANDRA) {
+        e->status = cassandra_status(cassandra_response);
     }
-    if (e.status == 0) {
+    if (e->status == 0) {
         return 0;
     }
-    e.duration = bpf_ktime_get_ns() - ns;
-    e.connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+    e->duration = bpf_ktime_get_ns() - ns;
+    e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
     return 0;
 }
 

+ 23 - 4
ebpftracer/ebpf/l7/mysql.c

@@ -1,13 +1,16 @@
 // https://dev.mysql.com/doc/dev/mysql-server/latest/PAGE_PROTOCOL.html
 #define MYSQL_COM_QUERY		    3
-#define MYSQL_COM_STMT_EXECUTE  23
+#define MYSQL_COM_STMT_PREPARE  0x16
+#define MYSQL_COM_STMT_EXECUTE  0x17
+#define MYSQL_COM_STMT_CLOSE    0x19
 
 #define MYSQL_RESPONSE_OK    0x00
 #define MYSQL_RESPONSE_EOF   0xfe
 #define MYSQL_RESPONSE_ERROR 0xff
 
+
 static __always_inline
-int is_mysql_query(char *buf, int buf_size) {
+int is_mysql_query(char *buf, int buf_size, __u8 *request_type) {
     if (buf_size < 1) {
         return 0;
     }
@@ -22,11 +25,19 @@ int is_mysql_query(char *buf, int buf_size) {
     if (b[4] ==  MYSQL_COM_QUERY || b[4] == MYSQL_COM_STMT_EXECUTE) {
         return 1;
     }
+    if (b[4] == MYSQL_COM_STMT_CLOSE) {
+        *request_type = MYSQL_COM_STMT_CLOSE;
+        return 1;
+    }
+    if (b[4] == MYSQL_COM_STMT_PREPARE) {
+        *request_type = MYSQL_COM_STMT_PREPARE;
+        return 1;
+    }
     return 0;
 }
 
 static __always_inline
-__u32 parse_mysql_status(char *buf, int buf_size) {
+__u32 parse_mysql_response(char *buf, int buf_size, __u8 request_type, __u32 *statement_id) {
     __u8 b[5];
     if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
         return 0;
@@ -35,7 +46,15 @@ __u32 parse_mysql_status(char *buf, int buf_size) {
         return 0;
     }
     int length = (int)b[0] | (int)b[1] << 8 | (int)b[2] << 16;
-    if (length == 1 || b[4] == MYSQL_RESPONSE_OK || b[4] == MYSQL_RESPONSE_EOF) {
+    if (length == 1 || b[4] == MYSQL_RESPONSE_EOF) {
+        return 200;
+    }
+    if (b[4] == MYSQL_RESPONSE_OK ) {
+        if (request_type == MYSQL_COM_STMT_PREPARE) {
+            if (bpf_probe_read(statement_id, sizeof(*statement_id), (void *)((char *)buf+5)) < 0) {
+                return 0;
+            }
+        }
         return 200;
     }
     if (b[4] == MYSQL_RESPONSE_ERROR) {

+ 10 - 4
ebpftracer/ebpf/l7/postgres.c

@@ -2,8 +2,12 @@
 // https://www.pgcon.org/2014/schedule/attachments/330_postgres-for-the-wire.pdf
 // https://www.postgresql.org/docs/current/protocol-message-formats.html
 
+#define POSTGRES_FRAME_SIMPLE_QUERY 'Q'
+#define POSTGRES_FRAME_PARSE 'P'
+#define POSTGRES_FRAME_CLOSE 'C'
+
 static __always_inline
-int is_postgres_query(char *buf, int buf_size) {
+int is_postgres_query(char *buf, int buf_size, __u8 *request_type) {
     if (buf_size < 1) {
         return 0;
     }
@@ -16,7 +20,9 @@ int is_postgres_query(char *buf, int buf_size) {
         return 0;
     }
     f_length = bpf_htonl(f_length);
-    if (f_cmd == 'Q' && f_length+1 == buf_size) {
+
+    *request_type = f_cmd;
+    if ((f_cmd == POSTGRES_FRAME_SIMPLE_QUERY || f_cmd == POSTGRES_FRAME_CLOSE) && f_length+1 == buf_size) {
         return 1;
     }
     char sync[5];
@@ -44,7 +50,7 @@ __u32 parse_postgres_status(char *buf, int buf_size) {
     if (length+1 > buf_size) {
         return 0;
     }
-    if (cmd == '2' && length == 4 && buf_size >= 10) {
+    if ((cmd == '1' || cmd == '2') && length == 4 && buf_size >= 10) {
         if (bpf_probe_read(&cmd, sizeof(cmd), (void *)((char *)buf+5)) < 0) {
             return 0;
         }
@@ -55,7 +61,7 @@ __u32 parse_postgres_status(char *buf, int buf_size) {
     if (cmd == 'E') {
         return 500;
     }
-    if (cmd == 'T' || cmd == 'D' || cmd == 'C') {
+    if (cmd == 't' || cmd == 'T' || cmd == 'D' || cmd == 'C') {
         return 200;
     }
     return 0;

+ 40 - 22
ebpftracer/tracer.go

@@ -21,6 +21,8 @@ import (
 	"time"
 )
 
+const PayloadSize = 512
+
 type EventType uint32
 type EventReason uint32
 
@@ -79,9 +81,11 @@ func (p L7Protocol) String() string {
 type L7Method uint8
 
 const (
-	L7MethodUnknown L7Method = 0
-	L7MethodProduce L7Method = 1
-	L7MethodConsume L7Method = 2
+	L7MethodUnknown          L7Method = 0
+	L7MethodProduce          L7Method = 1
+	L7MethodConsume          L7Method = 2
+	L7MethodStatementPrepare L7Method = 3
+	L7MethodStatementClose   L7Method = 4
 )
 
 func (m L7Method) String() string {
@@ -97,10 +101,12 @@ func (m L7Method) String() string {
 }
 
 type L7Request struct {
-	Protocol L7Protocol
-	Status   int
-	Duration time.Duration
-	Method   L7Method
+	Protocol    L7Protocol
+	Status      int
+	Duration    time.Duration
+	Method      L7Method
+	StatementId uint32
+	Payload     [PayloadSize]byte
 }
 
 func (r *L7Request) StatusString() string {
@@ -196,6 +202,12 @@ func (t *Tracer) init(ch chan<- Event) error {
 	return nil
 }
 
+type perfMap struct {
+	name                  string
+	perCPUBufferSizePages int
+	event                 rawEvent
+}
+
 func (t *Tracer) ebpf(ch chan<- Event, kernelVersion string, disableL7Tracing bool) error {
 	kv := "v" + common.KernelMajorMinor(kernelVersion)
 	var prg []byte
@@ -224,25 +236,26 @@ func (t *Tracer) ebpf(ch chan<- Event, kernelVersion string, disableL7Tracing bo
 	}
 	t.collection = c
 
-	events := map[string]rawEvent{
-		"proc_events":           &procEvent{},
-		"tcp_listen_events":     &tcpEvent{},
-		"tcp_connect_events":    &tcpEvent{},
-		"tcp_retransmit_events": &tcpEvent{},
-		"file_events":           &fileEvent{},
+	perfMaps := []perfMap{
+		{name: "proc_events", event: &procEvent{}, perCPUBufferSizePages: 4},
+		{name: "tcp_listen_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
+		{name: "tcp_connect_events", event: &tcpEvent{}, perCPUBufferSizePages: 8},
+		{name: "tcp_retransmit_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
+		{name: "file_events", event: &fileEvent{}, perCPUBufferSizePages: 4},
 	}
+
 	if !disableL7Tracing {
-		events["l7_events"] = &l7Event{}
+		perfMaps = append(perfMaps, perfMap{name: "l7_events", event: &l7Event{}, perCPUBufferSizePages: 16})
 	}
 
-	for name, typ := range events {
-		r, err := perf.NewReader(t.collection.Maps[name], os.Getpagesize())
+	for _, pm := range perfMaps {
+		r, err := perf.NewReader(t.collection.Maps[pm.name], pm.perCPUBufferSizePages*os.Getpagesize())
 		if err != nil {
 			t.Close()
 			return fmt.Errorf("failed to create ebpf reader: %w", err)
 		}
-		t.readers[name] = r
-		go runEventsReader(name, r, ch, typ)
+		t.readers[pm.name] = r
+		go runEventsReader(pm.name, r, ch, pm.event)
 	}
 
 	for name, spec := range spec.Programs {
@@ -369,14 +382,19 @@ type l7Event struct {
 	Duration            uint64
 	Protocol            uint8
 	Method              uint8
+	Padding             uint16
+	StatementId         uint32
+	Payload             [PayloadSize]byte
 }
 
 func (e l7Event) Event() Event {
 	return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, Timestamp: e.ConnectionTimestamp, L7Request: &L7Request{
-		Protocol: L7Protocol(e.Protocol),
-		Status:   int(e.Status),
-		Duration: time.Duration(e.Duration),
-		Method:   L7Method(e.Method),
+		Protocol:    L7Protocol(e.Protocol),
+		Status:      int(e.Status),
+		Duration:    time.Duration(e.Duration),
+		Method:      L7Method(e.Method),
+		StatementId: e.StatementId,
+		Payload:     e.Payload,
 	}}
 }
 

+ 19 - 12
go.mod

@@ -3,7 +3,7 @@ module github.com/coroot/coroot-node-agent
 go 1.19
 
 require (
-	cloud.google.com/go/compute/metadata v0.2.1
+	cloud.google.com/go/compute/metadata v0.2.3
 	github.com/aws/aws-sdk-go-v2/config v1.18.3
 	github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19
 	github.com/cilium/cilium v1.13.0
@@ -17,12 +17,18 @@ require (
 	github.com/mdlayher/taskstats v0.0.0-00010101000000-000000000000
 	github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417
 	github.com/prometheus/client_golang v1.14.0
-	github.com/stretchr/testify v1.8.1
+	github.com/stretchr/testify v1.8.2
 	github.com/vishvananda/netlink v1.2.1-beta.2.0.20220608195807-1a118fe229fc
 	github.com/vishvananda/netns v0.0.4
 	github.com/xin053/hsperfdata v0.2.3
+	go.mongodb.org/mongo-driver v1.11.4
+	go.opentelemetry.io/otel v1.14.0
+	go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0
+	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.14.0
+	go.opentelemetry.io/otel/sdk v1.14.0
+	go.opentelemetry.io/otel/trace v1.14.0
 	golang.org/x/mod v0.7.0
-	golang.org/x/net v0.4.0
+	golang.org/x/net v0.7.0
 	golang.org/x/sys v0.6.0
 	gopkg.in/alecthomas/kingpin.v2 v2.2.6
 	gopkg.in/yaml.v2 v2.4.0
@@ -31,7 +37,7 @@ require (
 )
 
 require (
-	cloud.google.com/go/compute v1.12.1 // indirect
+	cloud.google.com/go/compute v1.15.1 // indirect
 	github.com/Microsoft/go-winio v0.5.1 // indirect
 	github.com/Microsoft/hcsshim v0.8.16 // indirect
 	github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
@@ -50,7 +56,8 @@ require (
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/bits-and-blooms/bitset v1.2.0 // indirect
 	github.com/blang/semver/v4 v4.0.0 // indirect
-	github.com/cespare/xxhash/v2 v2.1.2 // indirect
+	github.com/cenkalti/backoff/v4 v4.2.0 // indirect
+	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/cilium/workerpool v1.1.3 // indirect
 	github.com/containerd/continuity v0.1.0 // indirect
 	github.com/containerd/fifo v1.0.0 // indirect
@@ -82,6 +89,7 @@ require (
 	github.com/golang/protobuf v1.5.2 // indirect
 	github.com/google/go-cmp v0.5.9 // indirect
 	github.com/google/uuid v1.3.0 // indirect
+	github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/inconshreveable/mousetrap v1.0.1 // indirect
 	github.com/josharian/intern v1.0.0 // indirect
@@ -122,18 +130,17 @@ require (
 	github.com/spf13/viper v1.14.0 // indirect
 	github.com/subosito/gotenv v1.4.1 // indirect
 	github.com/yusufpapurcu/wmi v1.2.2 // indirect
-	go.mongodb.org/mongo-driver v1.11.2 // indirect
-	go.opencensus.io v0.23.0 // indirect
-	go.opentelemetry.io/otel v1.11.2 // indirect
-	go.opentelemetry.io/otel/trace v1.11.2 // indirect
+	go.opencensus.io v0.24.0 // indirect
+	go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
+	go.opentelemetry.io/proto/otlp v0.19.0 // indirect
 	go4.org/intern v0.0.0-20210108033219-3eb7198706b2 // indirect
 	go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
 	golang.org/x/exp v0.0.0-20221106115401-f9659909a136 // indirect
 	golang.org/x/sync v0.1.0 // indirect
-	golang.org/x/text v0.5.0 // indirect
+	golang.org/x/text v0.7.0 // indirect
 	golang.org/x/time v0.2.0 // indirect
-	google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e // indirect
-	google.golang.org/grpc v1.51.0 // indirect
+	google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
+	google.golang.org/grpc v1.53.0 // indirect
 	google.golang.org/protobuf v1.28.1 // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect

+ 65 - 25
go.sum

@@ -24,10 +24,10 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf
 cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
 cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
 cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
-cloud.google.com/go/compute v1.12.1 h1:gKVJMEyqV5c/UnpzjjQbo3Rjvvqpr9B1DFSbJC4OXr0=
-cloud.google.com/go/compute v1.12.1/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU=
-cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22mle96M1yP48=
-cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
+cloud.google.com/go/compute v1.15.1 h1:7UGq3QknM33pw5xATlpzeoomNxsacIVvTqTTvbfajmE=
+cloud.google.com/go/compute v1.15.1/go.mod h1:bjjoF/NtFUrkD/urWfdHaKuOPDR5nWIs63rR+SXhcpA=
+cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
+cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
 cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
 cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
 cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
@@ -88,6 +88,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
 github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
 github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
 github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0=
+github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
 github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
 github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
@@ -137,11 +138,14 @@ github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7
 github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8=
 github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50=
 github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
+github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
+github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
 github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/checkpoint-restore/go-criu/v4 v4.1.0/go.mod h1:xUQBLp4RLc5zJtWY++yjOoMoB5lihDt7fai+75m+rGw=
 github.com/checkpoint-restore/go-criu/v5 v5.0.0/go.mod h1:cfwC0EG7HMUenopBsUf9d89JlCLQIfgVcNsNN0t6T2M=
 github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
@@ -163,6 +167,11 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
 github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
 github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
 github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
+github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
+github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
 github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
 github.com/containerd/aufs v0.0.0-20200908144142-dab0cbea06f4/go.mod h1:nukgQABAEopAHvB6j7cnP5zJ+/3aVcE7hCYqvIwAHyE=
 github.com/containerd/aufs v0.0.0-20201003224125-76a6863f2989/go.mod h1:AkGGQs9NM2vtYHaUen+NljV0/baGCAPELGm2q9ZXpWU=
@@ -308,6 +317,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
 github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
 github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
 github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
+github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
 github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
@@ -427,6 +438,8 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
+github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
 github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -515,6 +528,9 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de
 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
 github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
 github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
+github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
+github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0=
+github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
 github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
@@ -771,6 +787,7 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5
 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
+github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@@ -840,8 +857,9 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
-github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
+github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
 github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs=
 github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
 github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
@@ -899,8 +917,8 @@ go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3C
 go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg=
 go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng=
 go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8=
-go.mongodb.org/mongo-driver v1.11.2 h1:+1v2rDQUWNcGW7/7E0Jvdz51V38XXxJfhzbV17aNHCw=
-go.mongodb.org/mongo-driver v1.11.2/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
+go.mongodb.org/mongo-driver v1.11.4 h1:4ayjakA013OdpGyL2K3ZqylTac/rMjrJOMZ1EHizXas=
+go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
 go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk=
 go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
 go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
@@ -908,13 +926,23 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
-go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
-go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
-go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0=
-go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI=
-go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs=
-go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0=
-go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA=
+go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
+go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
+go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM=
+go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU=
+go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 h1:/fXHZHGvro6MVqV34fJzDhi7sHGpX3Ej/Qjmfn003ho=
+go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0/go.mod h1:UFG7EBMRdXyFstOwH028U0sVf+AvukSGhF0g8+dmNG8=
+go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 h1:TKf2uAs2ueguzLaxOCBXNpHxfO/aC7PAdDsSH0IbeRQ=
+go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0/go.mod h1:HrbCVv40OOLTABmOn1ZWty6CHXkU8DK/Urc43tHug70=
+go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.14.0 h1:3jAYbRHQAqzLjd9I4tzxwJ8Pk/N6AqBcF6m1ZHrxG94=
+go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.14.0/go.mod h1:+N7zNjIJv4K+DeX67XXET0P+eIciESgaFDBqh+ZJFS4=
+go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvxGzY=
+go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM=
+go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M=
+go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8=
+go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
+go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
+go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
 go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@@ -1025,6 +1053,7 @@ golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
 golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@@ -1032,8 +1061,8 @@ golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qx
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
-golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
+golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
+golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -1044,8 +1073,9 @@ golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ
 golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
+golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
-golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk=
+golang.org/x/oauth2 v0.4.0 h1:NF0gk8LVPg1Ml7SSbGyySuoxdsXitj7TvgvuRxIMc/M=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -1141,10 +1171,12 @@ golang.org/x/sys v0.0.0-20210216163648-f7da38b97c65/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -1160,7 +1192,7 @@ golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.3.0 h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=
+golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -1170,8 +1202,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
-golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -1295,6 +1327,7 @@ google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfG
 google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
 google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
 google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
+google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
 google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
 google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
 google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
@@ -1309,8 +1342,9 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D
 google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
 google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
 google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
-google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e h1:S9GbmC1iCgvbLyAokVCwiO6tVIrU9Y7c5oMx1V/ki/Y=
-google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e/go.mod h1:9qHF0xnpdSfF6knlcsnpzUu5y+rpwgbvsyGAZPBMg4s=
+google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
+google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w=
+google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
 google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
 google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
@@ -1328,11 +1362,15 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
 google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
 google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
 google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
+google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
 google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
 google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
 google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
-google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
-google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
+google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
+google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
+google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
+google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -1345,6 +1383,7 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
 google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
@@ -1373,6 +1412,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
 gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

+ 6 - 1
main.go

@@ -6,6 +6,7 @@ import (
 	"github.com/coroot/coroot-node-agent/containers"
 	"github.com/coroot/coroot-node-agent/flags"
 	"github.com/coroot/coroot-node-agent/node"
+	"github.com/coroot/coroot-node-agent/tracing"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"golang.org/x/mod/semver"
@@ -89,8 +90,12 @@ func main() {
 	if semver.Compare("v"+ver, "v"+minSupportedKernelVersion) == -1 {
 		klog.Exitf("the minimum Linux kernel version required is %s or later", minSupportedKernelVersion)
 	}
+
+	machineId := machineID()
+	tracing.Init(machineId, hostname, version)
+
 	registry := prometheus.NewRegistry()
-	registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineID()}, registry)
+	registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry)
 
 	registerer.MustRegister(info("node_agent_info", version))
 

+ 45 - 0
tracing/http.go

@@ -0,0 +1,45 @@
+package tracing
+
+import (
+	"bytes"
+	"fmt"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/codes"
+	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
+	"go.opentelemetry.io/otel/trace"
+	"inet.af/netaddr"
+	"time"
+)
+
+func handleHttpRequest(start, end time.Time, r *ebpftracer.L7Request, dest netaddr.IPPort, attrs []attribute.KeyValue) {
+	method, path := parseHttp(r.Payload[:])
+	if method == "" {
+		return
+	}
+	_, span := tracer.Start(nil, method, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(append(
+		attrs,
+		semconv.HTTPURL(fmt.Sprintf("http://%s%s", dest.String(), path)),
+		semconv.HTTPMethod(method),
+		semconv.HTTPSchemeHTTP,
+		semconv.HTTPStatusCode(r.Status),
+	)...)
+	if r.Status >= 400 {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}
+
+func parseHttp(payload []byte) (string, string) {
+	// the HTTP method is being validated in the eBPF code, confirming that the request is an HTTP request
+	method, rest, ok := bytes.Cut(payload, space)
+	if !ok {
+		return "", ""
+	}
+	uri, _, ok := bytes.Cut(rest, space)
+	if !ok {
+		uri = append(uri, []byte("...")...)
+	}
+	return string(method), string(uri)
+}

+ 62 - 0
tracing/memcached.go

@@ -0,0 +1,62 @@
+package tracing
+
+import (
+	"bytes"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/codes"
+	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
+	"go.opentelemetry.io/otel/trace"
+	"strings"
+	"time"
+)
+
+const (
+	MemcacheDBItemKeyName attribute.Key = "db.memcached.item"
+)
+
+func handleMemcachedQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue) {
+	cmd, items := parseMemcached(r.Payload[:])
+	if cmd == "" {
+		return
+	}
+	_, span := tracer.Start(nil, cmd, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	if len(items) == 1 {
+		attrs = append(attrs, MemcacheDBItemKeyName.String(items[0]))
+	} else if len(items) > 1 {
+		attrs = append(attrs, MemcacheDBItemKeyName.StringSlice(items))
+	}
+	span.SetAttributes(append(attrs, semconv.DBSystemMemcached, semconv.DBOperation(cmd))...)
+	if r.Status == 500 {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}
+
+func parseMemcached(payload []byte) (string, []string) {
+	cmd, rest, ok := bytes.Cut(payload, space)
+	if !ok {
+		return "", nil
+	}
+	command := string(cmd)
+	switch command {
+	case "set", "add", "cas", "append", "prepend", "replace", "delete", "incr", "decr", "touch":
+		if key, _, ok := bytes.Cut(rest, space); ok {
+			return command, []string{string(key)}
+		}
+	case "gat", "gats":
+		_, rest, ok = bytes.Cut(rest, space)
+		if ok {
+			keys, _, ok := bytes.Cut(rest, crlf)
+			if ok {
+				return command, strings.Split(string(keys), " ")
+			}
+		}
+	case "get", "gets":
+		keys, _, ok := bytes.Cut(rest, crlf)
+		if ok {
+			return command, strings.Split(string(keys), " ")
+		}
+	}
+	return "", nil
+}

+ 65 - 0
tracing/mongo.go

@@ -0,0 +1,65 @@
+package tracing
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/binary"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/codes"
+	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
+	"go.opentelemetry.io/otel/trace"
+	"io"
+	"time"
+)
+
+const (
+	MongoOpMSG = 2013
+)
+
+func handleMongoQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue) {
+	query := parseMongo(r.Payload[:])
+	if query == "" {
+		return
+	}
+	_, span := tracer.Start(nil, "query", trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(append(attrs, semconv.DBSystemMongoDB, semconv.DBStatement(query))...)
+	if r.Status == 500 {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}
+
+type mongoMsgHeader struct {
+	MessageLength int32
+	RequestID     int32
+	ResponseTo    int32
+	OpCode        int32
+}
+
+func parseMongo(payload []byte) string {
+	h := &mongoMsgHeader{}
+	reader := bufio.NewReader(bytes.NewReader(payload))
+	if err := binary.Read(reader, binary.LittleEndian, h); err != nil {
+		return ""
+	}
+	if h.OpCode != MongoOpMSG {
+		return ""
+	}
+	if _, err := reader.Discard(4); err != nil { //flagBits
+		return ""
+	}
+	if sectionKind, err := reader.ReadByte(); err != nil || sectionKind != 0 {
+		return ""
+	}
+	return bsonToString(reader)
+}
+
+func bsonToString(r io.Reader) string {
+	raw, err := bson.NewFromIOReader(r)
+	if err != nil {
+		return "<truncated>"
+	}
+	return raw.String()
+}

+ 81 - 0
tracing/mysql.go

@@ -0,0 +1,81 @@
+package tracing
+
+import (
+	"encoding/binary"
+	"fmt"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/codes"
+	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
+	"go.opentelemetry.io/otel/trace"
+	"strconv"
+	"time"
+)
+
+const (
+	MysqlComQuery       = 3
+	MysqlComStmtPrepare = 0x16
+	MysqlComStmtExecute = 0x17
+	MysqlComStmtClose   = 0x19
+	mysqlMsgHeaderSize  = 4
+)
+
+func handleMysqlQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue, preparedStatements map[string]string) {
+	query := parseMysql(r.Payload[:], r.StatementId, preparedStatements)
+	if query == "" {
+		return
+	}
+
+	_, span := tracer.Start(nil, "query", trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(append(attrs, semconv.DBSystemMySQL, semconv.DBStatement(query))...)
+	if r.Status == 500 {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}
+
+func parseMysql(payload []byte, statementId uint32, preparedStatements map[string]string) string {
+	payloadSize := len(payload)
+	if payloadSize < mysqlMsgHeaderSize+5 {
+		return ""
+	}
+	msgSize := int(payload[0]) | int(payload[1])<<8 | int(payload[2])<<16
+	cmd := payload[4]
+	readQuery := func() (query string) {
+		to := mysqlMsgHeaderSize + msgSize
+		partial := false
+		if to > payloadSize-1 {
+			to = payloadSize - 1
+			partial = true
+		}
+		query = string(payload[mysqlMsgHeaderSize+1 : to])
+		if partial {
+			query += "..."
+		}
+		return query
+	}
+	readStatementId := func() string {
+		return strconv.FormatUint(uint64(binary.LittleEndian.Uint32(payload[mysqlMsgHeaderSize+1:])), 10)
+	}
+
+	switch cmd {
+	case MysqlComQuery:
+		return readQuery()
+	case MysqlComStmtExecute:
+		statementIdStr := readStatementId()
+		statement, ok := preparedStatements[statementIdStr]
+		if !ok {
+			statement = fmt.Sprintf(`EXECUTE %s /* unknown */`, statementIdStr)
+		}
+		return statement
+	case MysqlComStmtPrepare:
+		query := readQuery()
+		statementIdStr := strconv.FormatUint(uint64(statementId), 10)
+		preparedStatements[statementIdStr] = query
+		return fmt.Sprintf("PREPARE %s FROM %s", statementIdStr, query)
+	case MysqlComStmtClose:
+		statementIdStr := readStatementId()
+		delete(preparedStatements, statementIdStr)
+	}
+	return ""
+}

+ 93 - 0
tracing/postgres.go

@@ -0,0 +1,93 @@
+package tracing
+
+import (
+	"bytes"
+	"fmt"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/codes"
+	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
+	"go.opentelemetry.io/otel/trace"
+	"time"
+)
+
+const (
+	PostgresFrameQuery byte = 'Q'
+	PostgresFrameBind  byte = 'B'
+	PostgresFrameParse byte = 'P'
+	PostgresFrameClose byte = 'C'
+)
+
+func handlePostgresQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue, preparedStatements map[string]string) {
+	query := parsePostgres(r.Payload[:], preparedStatements)
+	if query == "" {
+		return
+	}
+	_, span := tracer.Start(nil, "query", trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(append(attrs, semconv.DBSystemPostgreSQL, semconv.DBStatement(query))...)
+	if r.Status == 500 {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}
+
+func parsePostgres(payload []byte, preparedStatements map[string]string) string {
+	l := len(payload)
+	if l < 5 {
+		return ""
+	}
+	cmd := payload[0]
+	switch cmd {
+	case PostgresFrameQuery:
+		var query string
+		if q, _, ok := bytes.Cut(payload[5:], []byte{0}); ok {
+			query = string(q)
+		} else {
+			query = string(q) + "..."
+		}
+		return query
+	case PostgresFrameBind:
+		_, rest, ok := bytes.Cut(payload[5:], []byte{0})
+		if !ok {
+			return ""
+		}
+		preparedStatementName, _, ok := bytes.Cut(rest, []byte{0})
+		if !ok {
+			return ""
+		}
+		preparedStatementNameStr := string(preparedStatementName)
+		statement, ok := preparedStatements[preparedStatementNameStr]
+		if !ok {
+			statement = fmt.Sprintf(`EXECUTE %s /* unknown */`, preparedStatementNameStr)
+		}
+		return statement
+	case PostgresFrameParse:
+		preparedStatementName, rest, ok := bytes.Cut(payload[5:], []byte{0})
+		if !ok {
+			return ""
+		}
+		var query string
+		q, _, ok := bytes.Cut(rest, []byte{0})
+		if ok {
+			query = string(q)
+		} else {
+			query = string(q) + "..."
+		}
+		preparedStatementNameStr := string(preparedStatementName)
+		preparedStatements[preparedStatementNameStr] = query
+		return fmt.Sprintf("PREPARE %s AS %s", preparedStatementNameStr, query)
+	case PostgresFrameClose:
+		if l < 7 {
+			return ""
+		}
+		if payload[5] != 'S' {
+			return ""
+		}
+		preparedStatementName, _, ok := bytes.Cut(payload[6:], []byte{0})
+		if !ok {
+			return ""
+		}
+		delete(preparedStatements, string(preparedStatementName))
+	}
+	return ""
+}

+ 64 - 0
tracing/redis.go

@@ -0,0 +1,64 @@
+package tracing
+
+import (
+	"bytes"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/codes"
+	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
+	"go.opentelemetry.io/otel/trace"
+	"strconv"
+	"time"
+)
+
+func handleRedisQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue) {
+	cmd, args := parseRedis(r.Payload[:])
+	if cmd == "" {
+		return
+	}
+	_, span := tracer.Start(nil, cmd, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	statement := cmd
+	if args != "" {
+		statement += " " + args
+	}
+	span.SetAttributes(append(attrs, semconv.DBSystemRedis, semconv.DBOperation(cmd), semconv.DBStatement(statement))...)
+	if r.Status == 500 {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}
+
+func parseRedis(payload []byte) (cmd string, args string) {
+	var v, rest []byte
+	var ok bool
+	v, rest, ok = bytes.Cut(payload, crlf)
+	if !ok || !bytes.HasPrefix(v, []byte("*")) {
+		return
+	}
+	arrayLen, err := strconv.ParseUint(string(v[1:]), 10, 32)
+	if err != nil {
+		return
+	}
+	readString := func() string {
+		v, rest, ok = bytes.Cut(rest, crlf)
+		if !ok || !bytes.HasPrefix(v, []byte("$")) {
+			return ""
+		}
+		v, rest, ok = bytes.Cut(rest, crlf)
+		if ok {
+			return string(v)
+		}
+		return ""
+	}
+	cmd = readString()
+	if cmd == "" {
+		return
+	}
+	if arrayLen > 1 {
+		args = readString()
+		if arrayLen > 2 {
+			args += " ..."
+		}
+	}
+	return
+}

+ 80 - 0
tracing/tracing.go

@@ -0,0 +1,80 @@
+package tracing
+
+import (
+	"context"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
+	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
+	"go.opentelemetry.io/otel/sdk/resource"
+	sdktrace "go.opentelemetry.io/otel/sdk/trace"
+	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
+	"go.opentelemetry.io/otel/trace"
+	"inet.af/netaddr"
+	"k8s.io/klog/v2"
+	"os"
+	"time"
+)
+
+var (
+	tracer trace.Tracer
+	space  = []byte{' '}
+	crlf   = []byte{'\r', '\n'}
+)
+
+func Init(machineId, hostname, version string) {
+	endpoint := os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
+	if endpoint == "" {
+		klog.Infoln("no OpenTelemetry collector endpoint configured")
+		return
+	}
+	klog.Infoln("OpenTelemetry collector endpoint:", endpoint)
+
+	client := otlptracehttp.NewClient()
+	exporter, err := otlptrace.New(context.Background(), client)
+	if err != nil {
+		klog.Exitln(err)
+	}
+	tracerProvider := sdktrace.NewTracerProvider(
+		sdktrace.WithBatcher(exporter),
+		sdktrace.WithResource(resource.NewWithAttributes(
+			semconv.SchemaURL,
+			semconv.ServiceName("coroot-node-agent"),
+			semconv.HostName(hostname),
+			semconv.HostID(machineId),
+		)),
+	)
+	otel.SetTracerProvider(tracerProvider)
+	tracer = tracerProvider.Tracer("coroot-node-agent", trace.WithInstrumentationVersion(version))
+}
+
+func HandleL7Request(containerId string, dest netaddr.IPPort, r *ebpftracer.L7Request, preparedStatements map[string]string) {
+	if tracer == nil {
+		return
+	}
+	end := time.Now()
+	start := end.Add(-r.Duration)
+
+	attrs := []attribute.KeyValue{
+		semconv.ContainerID(containerId),
+		semconv.NetPeerName(dest.IP().String()),
+		semconv.NetPeerPort(int(dest.Port())),
+	}
+	switch r.Protocol {
+	case ebpftracer.L7ProtocolHTTP:
+		handleHttpRequest(start, end, r, dest, attrs)
+	case ebpftracer.L7ProtocolMemcached:
+		handleMemcachedQuery(start, end, r, attrs)
+	case ebpftracer.L7ProtocolRedis:
+		handleRedisQuery(start, end, r, attrs)
+	case ebpftracer.L7ProtocolPostgres:
+		handlePostgresQuery(start, end, r, attrs, preparedStatements)
+	case ebpftracer.L7ProtocolMysql:
+		handleMysqlQuery(start, end, r, attrs, preparedStatements)
+	case ebpftracer.L7ProtocolMongo:
+		handleMongoQuery(start, end, r, attrs)
+	default:
+		return
+	}
+}

+ 63 - 0
tracing/tracing_test.go

@@ -0,0 +1,63 @@
+package tracing
+
+import (
+	"bytes"
+	"github.com/stretchr/testify/assert"
+	"go.mongodb.org/mongo-driver/bson"
+	"testing"
+)
+
+func Test_parseHttp(t *testing.T) {
+	m, p := parseHttp([]byte(`HEAD /1 HTTP/1.1\nHost: 127.0.0.1\nUser-Agent: curl/8.0.1\nAccept: */*\n\nxzxxxxxxzx`))
+	assert.Equal(t, "HEAD", m)
+	assert.Equal(t, "/1", p)
+
+	m, p = parseHttp([]byte(`GET /too-long-uri`))
+	assert.Equal(t, "GET", m)
+	assert.Equal(t, "/too-long-uri...", p)
+}
+
+func Test_parseMemcached(t *testing.T) {
+	cmd, items := parseMemcached(append([]byte(`incr 1111 2222`), '\r', '\n'))
+	assert.Equal(t, "incr", cmd)
+	assert.Equal(t, []string{"1111"}, items)
+
+	cmd, items = parseMemcached(append([]byte(`gets 1111 2222 3333`), '\r', '\n'))
+	assert.Equal(t, "gets", cmd)
+	assert.Equal(t, []string{"1111", "2222", "3333"}, items)
+}
+
+func Test_parseRedis(t *testing.T) {
+	cmd, args := parseRedis([]byte{
+		'*', '3', '\r', '\n',
+		'$', '4', '\r', '\n',
+		'L', 'L', 'E', 'N', '\r', '\n',
+		'$', '6', '\r', '\n',
+		'm', 'y', 'l', 'i', 's', 't', '\r', '\n',
+		'$', '2', '\r', '\n',
+		'x', 'y', '\r', '\n',
+	})
+	assert.Equal(t, "LLEN", cmd)
+	assert.Equal(t, "mylist ...", args)
+
+	cmd, args = parseRedis([]byte{
+		'*', '2', '\r', '\n',
+		'$', '8', '\r', '\n',
+		'S', 'M', 'E', 'M', 'B', 'E', 'R', 'S', '\r', '\n',
+		'$', '6', '\r', '\n',
+		'm', 'y', 'l', 'i', 's', 't', '\r', '\n',
+	})
+
+	assert.Equal(t, "SMEMBERS", cmd)
+	assert.Equal(t, "mylist", args)
+}
+
+func Test_parseMongo(t *testing.T) {
+	v := bson.M{"a": "bssssssssssssssssssssssssssssssssssssssssss"}
+	buf := make([]byte, 1024)
+	data, err := bson.Marshal(v)
+	assert.NoError(t, err)
+	copy(buf, data)
+	assert.Equal(t, `{"a": "bssssssssssssssssssssssssssssssssssssssssss"}`, bsonToString(bytes.NewReader(buf)))
+	assert.Equal(t, `<truncated>`, bsonToString(bytes.NewReader(buf[:20])))
+}

Some files were not shown because too many files changed in this diff