|
|
@@ -5,6 +5,7 @@
|
|
|
#include "mysql.c"
|
|
|
#include "mongo.c"
|
|
|
#include "kafka.c"
|
|
|
+#include "cassandra.c"
|
|
|
|
|
|
|
|
|
#define PROTOCOL_UNKNOWN 0
|
|
|
@@ -15,6 +16,7 @@
|
|
|
#define PROTOCOL_MYSQL 5
|
|
|
#define PROTOCOL_MONGO 6
|
|
|
#define PROTOCOL_KAFKA 7
|
|
|
+#define PROTOCOL_CASSANDRA 8
|
|
|
|
|
|
struct l7_event {
|
|
|
__u64 fd;
|
|
|
@@ -46,6 +48,7 @@ struct {
|
|
|
struct socket_key {
|
|
|
__u64 fd;
|
|
|
__u32 pid;
|
|
|
+ __s16 stream_id;
|
|
|
};
|
|
|
|
|
|
struct l7_request {
|
|
|
@@ -92,6 +95,7 @@ int trace_enter_write(__u64 fd, char *buf, __u64 size) {
|
|
|
struct socket_key k = {};
|
|
|
k.pid = id >> 32;
|
|
|
k.fd = fd;
|
|
|
+ k.stream_id = -1;
|
|
|
if (is_http_request(buf)) {
|
|
|
req.protocol = PROTOCOL_HTTP;
|
|
|
} else if (is_postgres_query(buf, size)) {
|
|
|
@@ -114,6 +118,10 @@ int trace_enter_write(__u64 fd, char *buf, __u64 size) {
|
|
|
req.ns = prev_req->ns;
|
|
|
}
|
|
|
}
|
|
|
+ k.stream_id = is_cassandra_request(buf, size);
|
|
|
+ if (k.stream_id != -1) {
|
|
|
+ req.protocol = PROTOCOL_CASSANDRA;
|
|
|
+ }
|
|
|
}
|
|
|
if (req.protocol == PROTOCOL_UNKNOWN) {
|
|
|
return 0;
|
|
|
@@ -147,16 +155,26 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
|
|
|
struct socket_key k = {};
|
|
|
k.pid = id >> 32;
|
|
|
k.fd = args->fd;
|
|
|
+ k.stream_id = -1;
|
|
|
buf = args->buf;
|
|
|
|
|
|
bpf_map_delete_elem(&active_reads, &id);
|
|
|
if (ctx->ret <= 0) {
|
|
|
return 0;
|
|
|
}
|
|
|
+ struct cassandra_header cassandra_response = {};
|
|
|
+ cassandra_response.stream_id = -1;
|
|
|
|
|
|
struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
|
|
|
if (!req) {
|
|
|
- return 0;
|
|
|
+ if (bpf_probe_read(&cassandra_response, sizeof(cassandra_response), (void *)(buf)) < 0) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ k.stream_id = cassandra_response.stream_id;
|
|
|
+ req = bpf_map_lookup_elem(&active_l7_requests, &k);
|
|
|
+ if (!req) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
}
|
|
|
__s32 request_id = req->request_id;
|
|
|
struct l7_event e = {};
|
|
|
@@ -189,6 +207,8 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
|
|
|
}
|
|
|
} else if (e.protocol == PROTOCOL_KAFKA) {
|
|
|
e.status = parse_kafka_status(request_id, buf, ctx->ret, partial);
|
|
|
+ } else if (e.protocol == PROTOCOL_CASSANDRA) {
|
|
|
+ e.status = cassandra_status(cassandra_response);
|
|
|
}
|
|
|
if (e.status == 0) {
|
|
|
return 0;
|