l7.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748
  1. #define PROTOCOL_UNKNOWN 0
  2. #define PROTOCOL_HTTP 1
  3. #define PROTOCOL_POSTGRES 2
  4. #define PROTOCOL_REDIS 3
  5. #define PROTOCOL_MEMCACHED 4
  6. #define PROTOCOL_MYSQL 5
  7. #define PROTOCOL_MONGO 6
  8. #define PROTOCOL_KAFKA 7
  9. #define PROTOCOL_CASSANDRA 8
  10. #define PROTOCOL_RABBITMQ 9
  11. #define PROTOCOL_NATS 10
  12. #define PROTOCOL_HTTP2 11
  13. #define PROTOCOL_DUBBO2 12
  14. #define PROTOCOL_TRACE 200
  15. #define STATUS_UNKNOWN 0
  16. #define STATUS_OK 200
  17. #define STATUS_FAILED 500
  18. #define METHOD_UNKNOWN 0
  19. #define METHOD_PRODUCE 1
  20. #define METHOD_CONSUME 2
  21. #define METHOD_STATEMENT_PREPARE 3
  22. #define METHOD_STATEMENT_CLOSE 4
  23. #define METHOD_HTTP2_CLIENT_FRAMES 5
  24. #define METHOD_HTTP2_SERVER_FRAMES 6
  25. #define MAX_PAYLOAD_SIZE 1024 // must be power of 2
  26. #define TRUNCATE_PAYLOAD_SIZE(size) ({ \
  27. size = MIN(size, MAX_PAYLOAD_SIZE-1); \
  28. asm volatile ("%0 &= %1" : "+r"(size) : "i"(MAX_PAYLOAD_SIZE-1)); \
  29. })
  30. #define COPY_PAYLOAD(dst, size, src) ({ \
  31. TRUNCATE_PAYLOAD_SIZE(size); \
  32. if (bpf_probe_read(dst, size, src)) { \
  33. return 0; \
  34. } \
  35. })
  36. #define NS_PER_SEC 1000000000ULL
  37. #define IOVEC_BUF_SIZE MAX_PAYLOAD_SIZE * 2 // must be double of MAX_PAYLOAD_SIZE
  38. #define MAX_IOVEC_SIZE 32
  39. #include "http.c"
  40. #include "postgres.c"
  41. #include "redis.c"
  42. #include "memcached.c"
  43. #include "mysql.c"
  44. #include "mongo.c"
  45. #include "kafka.c"
  46. #include "cassandra.c"
  47. #include "rabbitmq.c"
  48. #include "nats.c"
  49. #include "http2.c"
  50. #include "dubbo2.c"
  51. #include "apm_trace.c"
  52. struct l7_event {
  53. __u64 fd;
  54. __u64 connection_timestamp;
  55. __u32 pid;
  56. __u32 status;
  57. __u64 duration;
  58. __u8 protocol;
  59. __u8 method;
  60. __u16 padding;
  61. __u32 statement_id;
  62. __u64 payload_size;
  63. __u64 trace_id;
  64. __u32 trace_start;
  65. __u32 trace_end;
  66. // __u32 test_id;
  67. char payload[MAX_PAYLOAD_SIZE];
  68. };
  69. struct test_t {
  70. __u32 test_id;
  71. };
  72. struct {
  73. __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
  74. __type(key, int);
  75. __type(value, struct l7_event);
  76. __uint(max_entries, 1);
  77. } l7_event_heap SEC(".maps");
  78. struct {
  79. __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
  80. __type(key, int);
  81. __type(value, struct test_t);
  82. __uint(max_entries, 1);
  83. } test_heap SEC(".maps");
  84. struct {
  85. __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
  86. __uint(key_size, sizeof(int));
  87. __uint(value_size, sizeof(int));
  88. } l7_events SEC(".maps");
  89. struct read_args {
  90. __u64 fd;
  91. char* buf;
  92. __u64* ret;
  93. __u64 iovlen;
  94. };
  95. struct {
  96. __uint(type, BPF_MAP_TYPE_HASH);
  97. __uint(key_size, sizeof(__u64));
  98. __uint(value_size, sizeof(struct read_args));
  99. __uint(max_entries, 10240);
  100. } active_reads SEC(".maps");
  101. struct l7_request_key {
  102. __u64 fd;
  103. __u32 pid;
  104. __u16 is_tls;
  105. __s16 stream_id;
  106. };
  107. struct l7_request {
  108. __u64 ns;
  109. __u8 protocol;
  110. __u8 partial;
  111. __u8 request_type;
  112. __s32 request_id;
  113. __u64 payload_size;
  114. char payload[MAX_PAYLOAD_SIZE];
  115. };
  116. struct {
  117. __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
  118. __type(key, int);
  119. __type(value, struct l7_request);
  120. __uint(max_entries, 1);
  121. } l7_request_heap SEC(".maps");
  122. struct {
  123. __uint(type, BPF_MAP_TYPE_LRU_HASH);
  124. __uint(key_size, sizeof(struct l7_request_key));
  125. __uint(value_size, sizeof(struct l7_request));
  126. __uint(max_entries, 32768);
  127. } active_l7_requests SEC(".maps");
  128. struct {
  129. __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
  130. __type(key, int);
  131. __type(value, char[IOVEC_BUF_SIZE]);
  132. __uint(max_entries, 1);
  133. } iovec_buf_heap SEC(".maps");
  134. struct trace_event_raw_sys_enter_rw__stub {
  135. __u64 unused;
  136. long int id;
  137. __u64 fd;
  138. char* buf;
  139. __u64 size;
  140. };
  141. struct trace_event_raw_sys_exit_rw__stub {
  142. __u64 unused;
  143. long int id;
  144. long int ret;
  145. };
  146. struct l7_iovec {
  147. char* buf;
  148. __u64 size;
  149. };
  150. struct l7_user_msghdr {
  151. void *msg_name;
  152. int msg_namelen;
  153. struct l7_iovec *msg_iov;
  154. __u64 msg_iovlen;
  155. };
  156. static inline __attribute__((__always_inline__))
  157. void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
  158. struct sk_info sk = {};
  159. sk.pid = pid;
  160. sk.fd = fd;
  161. __u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
  162. if (timestamp) {
  163. if (*timestamp == 0) {
  164. return;
  165. }
  166. e->connection_timestamp = *timestamp;
  167. } else {
  168. e->connection_timestamp = 0;
  169. }
  170. e->fd = fd;
  171. e->pid = pid;
  172. bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
  173. }
  174. static inline __attribute__((__always_inline__))
  175. __u64 read_iovec(char *l7_iovec, __u64 iovlen, __u64 ret, char *buf) {
  176. struct l7_iovec iov = {};
  177. __u64 max = (ret) ? MIN(ret, MAX_PAYLOAD_SIZE) : MAX_PAYLOAD_SIZE;
  178. __u64 offset = 0;
  179. __u64 size = 0;
  180. #pragma unroll
  181. for (int i = 0; i < MAX_IOVEC_SIZE; i++) {
  182. if (i >= iovlen) {
  183. break;
  184. }
  185. if (bpf_probe_read(&iov, sizeof(iov), (void *)(l7_iovec+i*sizeof(iov)))) {
  186. return 0;
  187. }
  188. if (iov.size <= 0) {
  189. continue;
  190. }
  191. size = MIN(iov.size, max-offset);
  192. TRUNCATE_PAYLOAD_SIZE(size);
  193. TRUNCATE_PAYLOAD_SIZE(offset);
  194. if (bpf_probe_read(buf + offset, size, (void *)iov.buf)) {
  195. return 0;
  196. }
  197. offset += size;
  198. if (offset >= max) {
  199. break;
  200. }
  201. }
  202. return offset;
  203. }
  204. static inline __attribute__((__always_inline__))
  205. int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, __u64 iovlen) {
  206. __u64 id = bpf_get_current_pid_tgid();
  207. __u32 zero = 0;
  208. __u32 pid, tid;
  209. __u32 http_status ;
  210. pid = id >> 32;
  211. tid = (__u32)id;
  212. if (load_filter_pid() != 0 && pid != load_filter_pid()) {
  213. return 0;
  214. }
  215. char* payload = buf;
  216. if (iovlen) {
  217. payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
  218. if (!payload) {
  219. return 0;
  220. }
  221. size = read_iovec(buf, iovlen, 0, payload);
  222. }
  223. if (!size) {
  224. return 0;
  225. }
  226. struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
  227. if (!req) {
  228. return 0;
  229. }
  230. req->protocol = PROTOCOL_UNKNOWN;
  231. req->partial = 0;
  232. req->request_id = 0;
  233. req->ns = 0;
  234. req->payload_size = size;
  235. struct l7_request_key k = {};
  236. k.pid = id >> 32;
  237. k.fd = fd;
  238. k.is_tls = is_tls;
  239. k.stream_id = -1;
  240. // cw_bpf_debug("enter-payload:%s|type:%s|FD:%d\n",payload,"type",k.fd);
  241. if (is_http_response(payload, &http_status))
  242. {
  243. // __u64 goid = get_rw_goid(120 * NS_PER_SEC, 1);
  244. // cw_bpf_debug("[Response][HTTP]:thread_id:%d|goid:%d|FD:%d\n", tid, goid, k.fd);
  245. // struct trace_key_t trace_key = get_trace_key(pid, tid);
  246. // struct fd_trace_key_t fd_trace_key = get_fd_trace_key(pid, fd);
  247. __u64 trace_id = get_fd_trace_id(pid, fd);
  248. // cw_bpf_debug("trace_id:%llu", trace_id);
  249. cw_bpf_debug("[trace end][Response][HTTP] pid:%d,fd:%d,traceid:%llu", pid, fd, trace_id);
  250. // 清除trace信息
  251. clear_trace(pid, tid, fd);
  252. // 发送事件到用户空间 start
  253. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  254. if (!e) {
  255. return 0;
  256. }
  257. struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
  258. if (!req)
  259. {
  260. // cw_bpf_debug("[Response][HTTP]:no req-----------");
  261. // cw_bpf_debug("[Response][HTTP]:pid:%d|tid:%d",k.pid,k.fd);
  262. // cw_bpf_debug("[Response][HTTP]:is_tls:%d|tid:%d",k.is_tls,k.stream_id);
  263. return 0;
  264. }
  265. e->duration = bpf_ktime_get_ns() - req->ns;
  266. // cw_bpf_debug("[Response][HTTP]:duration->ns:%d\n",e->duration);
  267. e->protocol = PROTOCOL_TRACE;
  268. e->status = http_status;
  269. e->pid = k.pid;
  270. e->fd = k.fd;
  271. // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
  272. e->trace_start = 0;
  273. e->trace_end = 1;
  274. e->trace_id = trace_id;
  275. e->payload_size = size;
  276. COPY_PAYLOAD(e->payload, size, payload);
  277. bpf_map_delete_elem(&active_l7_requests, &k);
  278. bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
  279. // 发送事件到用户空间 end
  280. __u64 k_version = load_filter_pid();
  281. cw_bpf_debug("filter_pid:%d", k_version);
  282. struct test_t *ttt = bpf_map_lookup_elem(&test_heap, &zero);
  283. if (!ttt) {
  284. return 0;
  285. }
  286. // struct member_fields_offset *off = bpf_map_lookup_elem(&__members_offset, &zero);
  287. // if (!off) {
  288. // return 0;
  289. // }
  290. // cw_bpf_debug("off->task__files_offset:%x", off->task__files_offset);
  291. cw_bpf_debug("e->test_id:%d", ttt->test_id);
  292. cw_bpf_debug("HTTP_END");
  293. return 0;
  294. }
  295. if (is_http_request(payload)) {
  296. cw_bpf_debug("[Enter][HTTP222]:TGID:%d|FD:%d\n",k.pid,k.fd);
  297. req->protocol = PROTOCOL_HTTP;
  298. } else if (is_postgres_query(payload, size, &req->request_type)) {
  299. if (req->request_type == POSTGRES_FRAME_CLOSE) {
  300. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  301. if (!e) {
  302. return 0;
  303. }
  304. e->protocol = PROTOCOL_POSTGRES;
  305. e->method = METHOD_STATEMENT_CLOSE;
  306. e->payload_size = size;
  307. COPY_PAYLOAD(e->payload, size, payload);
  308. send_event(ctx, e, k.pid, k.fd);
  309. return 0;
  310. }
  311. req->protocol = PROTOCOL_POSTGRES;
  312. } else if (is_redis_query(payload, size)) {
  313. cw_bpf_debug("[Enter][Redis]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
  314. req->protocol = PROTOCOL_REDIS;
  315. } else if (is_memcached_query(payload, size)) {
  316. req->protocol = PROTOCOL_MEMCACHED;
  317. } else if (is_mysql_query(payload, size, &req->request_type)) {
  318. cw_bpf_debug("[Enter][Mysql]:thread_id:%d\n",tid);
  319. if (req->request_type == MYSQL_COM_STMT_CLOSE) {
  320. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  321. if (!e) {
  322. return 0;
  323. }
  324. e->protocol = PROTOCOL_MYSQL;
  325. e->method = METHOD_STATEMENT_CLOSE;
  326. e->payload_size = size;
  327. COPY_PAYLOAD(e->payload, size, payload);
  328. send_event(ctx, e, k.pid, k.fd);
  329. return 0;
  330. }
  331. req->protocol = PROTOCOL_MYSQL;
  332. } else if (is_mongo_query(payload, size)) {
  333. req->protocol = PROTOCOL_MONGO;
  334. } else if (is_rabbitmq_produce(payload, size)) {
  335. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  336. if (!e) {
  337. return 0;
  338. }
  339. e->protocol = PROTOCOL_RABBITMQ;
  340. e->method = METHOD_PRODUCE;
  341. send_event(ctx, e, k.pid, k.fd);
  342. return 0;
  343. } else if (nats_method(payload, size) == METHOD_PRODUCE) {
  344. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  345. if (!e) {
  346. return 0;
  347. }
  348. e->protocol = PROTOCOL_NATS;
  349. e->method = METHOD_PRODUCE;
  350. send_event(ctx, e, k.pid, k.fd);
  351. return 0;
  352. } else if (is_cassandra_request(payload, size, &k.stream_id)) {
  353. req->protocol = PROTOCOL_CASSANDRA;
  354. } else if (is_kafka_request(payload, size, &req->request_id)) {
  355. req->protocol = PROTOCOL_KAFKA;
  356. struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
  357. if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
  358. req->ns = prev_req->ns;
  359. }
  360. } else if (looks_like_http2_frame(payload, size, METHOD_HTTP2_CLIENT_FRAMES)) {
  361. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  362. if (!e) {
  363. return 0;
  364. }
  365. e->protocol = PROTOCOL_HTTP2;
  366. e->method = METHOD_HTTP2_CLIENT_FRAMES;
  367. e->duration = bpf_ktime_get_ns();
  368. e->payload_size = size;
  369. COPY_PAYLOAD(e->payload, size, payload);
  370. send_event(ctx, e, k.pid, k.fd);
  371. return 0;
  372. } else if (is_dubbo2_request(payload, size)) {
  373. req->protocol = PROTOCOL_DUBBO2;
  374. }
  375. if (req->protocol == PROTOCOL_UNKNOWN) {
  376. return 0;
  377. }
  378. if (req->ns == 0) {
  379. req->ns = bpf_ktime_get_ns();
  380. }
  381. COPY_PAYLOAD(req->payload, size, payload);
  382. bpf_map_update_elem(&active_l7_requests, &k, req, BPF_NOEXIST);
  383. return 0;
  384. }
  385. static inline __attribute__((__always_inline__))
  386. int trace_enter_read(__u64 id, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
  387. struct read_args args = {};
  388. args.fd = fd;
  389. args.buf = buf;
  390. args.ret = ret;
  391. args.iovlen = iovlen;
  392. __u32 pid = id >> 32;
  393. bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY);
  394. return 0;
  395. }
  396. static inline __attribute__((__always_inline__))
  397. int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret) {
  398. __u32 tid = (__u32)id;
  399. if (load_filter_pid() != 0 && pid != load_filter_pid()) {
  400. return 0;
  401. }
  402. struct read_args *args = bpf_map_lookup_elem(&active_reads, &id);
  403. if (!args) {
  404. return 0;
  405. }
  406. struct l7_request_key k = {};
  407. k.pid = pid;
  408. k.fd = args->fd;
  409. k.is_tls = is_tls;
  410. k.stream_id = -1;
  411. bpf_map_delete_elem(&active_reads, &id);
  412. if (ret <= 0) {
  413. return 0;
  414. }
  415. if (args->ret) {
  416. if (bpf_probe_read(&ret, sizeof(ret), (void*)args->ret)) {
  417. return 0;
  418. };
  419. if (ret <= 0) {
  420. return 0;
  421. }
  422. }
  423. int zero = 0;
  424. char* payload = args->buf;
  425. if (args->iovlen) {
  426. payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
  427. if (!payload) {
  428. return 0;
  429. }
  430. ret = read_iovec(args->buf, args->iovlen, ret, payload);
  431. if (!ret) {
  432. return 0;
  433. }
  434. }
  435. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  436. if (!e) {
  437. return 0;
  438. }
  439. e->fd = k.fd;
  440. e->pid = k.pid;
  441. e->protocol = PROTOCOL_UNKNOWN;
  442. e->status = STATUS_UNKNOWN;
  443. e->method = METHOD_UNKNOWN;
  444. e->statement_id = 0;
  445. e->payload_size = 0;
  446. // __u32 k0 = 0;
  447. // struct member_fields_offset *offset = members_offset__lookup(&k0);
  448. // if (!offset)
  449. // return -1;
  450. // void *sk = get_socket_from_fd(args->fd, offset);
  451. // struct conn_info_t *conn_info, __conn_info = { 0 };
  452. // conn_info = &__conn_info;
  453. ////
  454. // init_conn_info(id >> 32, args->fd, &__conn_info, sk, offset);
  455. ////
  456. // infer_dns_message(payload, (int)PT_REGS_RC((struct pt_regs *)ctx),
  457. // conn_info);
  458. // 被调用方http入口
  459. if (is_http_request(payload)) {
  460. struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
  461. if (!req)
  462. {
  463. return 0;
  464. } else
  465. {
  466. req->protocol = PROTOCOL_HTTP;
  467. req->payload_size = ret;
  468. req->ns = bpf_ktime_get_ns();
  469. COPY_PAYLOAD(req->payload, ret, payload);
  470. // cw_bpf_debug("[Receive][HTTP]:pid:%d|tid:%d",k.pid,k.fd);
  471. // cw_bpf_debug("[Receive][HTTP]:is_tls:%d|tid:%d",k.is_tls,k.stream_id);
  472. bpf_map_update_elem(&active_l7_requests, &k, req, BPF_NOEXIST);
  473. }
  474. cw_bpf_debug("[trace start][Receive][HTTP]:thread_id:%d\n",tid);
  475. // cw_bpf_debug("[Receive][HTTP] payload1:%s|type:%s\n",payload,"type");
  476. // __u64 goid = 0;
  477. // goid = get_rw_goid(120 * NS_PER_SEC, 1);
  478. // cw_bpf_debug("[Receive][HTTP] goid:%llu\n",goid);
  479. // __u64 goid = get_rw_goid(120 * NS_PER_SEC, 1);
  480. // cw_bpf_debug("[Receive][HTTP]:thread_id:%d|goid:%d|FD:%d\n", tid, goid, k.fd);
  481. struct apm_trace_key_t trace_key = {0};
  482. struct apm_trace_info_t trace_info = {0};
  483. trace_key.tgid = pid;
  484. trace_key.pid = tid;
  485. __u64 uid_base = bpf_ktime_get_ns();
  486. trace_info.trace_id = bpf_get_current_pid_tgid() + uid_base;
  487. e->trace_start = 1;
  488. e->trace_end = 0;
  489. e->protocol = PROTOCOL_TRACE;
  490. e->trace_id = trace_info.trace_id;
  491. e->payload_size = ret;
  492. COPY_PAYLOAD(e->payload, ret, payload);
  493. // http trace
  494. struct fd_trace_key_t fd_trace_key = {};
  495. fd_trace_key.tgid = pid;
  496. fd_trace_key.fd = k.fd;
  497. bpf_map_update_elem(&fd_trace_info_heap, &fd_trace_key, &trace_info, BPF_NOEXIST);
  498. cw_bpf_debug("[Receive][HTTP] pid:%d,fd:%d,traceid:%llu", fd_trace_key.tgid, fd_trace_key.fd,
  499. trace_info.trace_id);
  500. // 入口方法缓存 bpf_map_update_elem(map, key, value, options)
  501. bpf_map_update_elem(&trace_info_heap, &trace_key, &trace_info, BPF_NOEXIST);
  502. bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
  503. cw_bpf_debug("[Receive][HTTP] to user space");
  504. return 0;
  505. }
  506. if (is_rabbitmq_consume(payload, ret)) {
  507. e->protocol = PROTOCOL_RABBITMQ;
  508. e->method = METHOD_CONSUME;
  509. send_event(ctx, e, k.pid, k.fd);
  510. return 0;
  511. }
  512. if (nats_method(payload, ret) == METHOD_CONSUME) {
  513. e->protocol = PROTOCOL_NATS;
  514. e->method = METHOD_CONSUME;
  515. send_event(ctx, e, k.pid, k.fd);
  516. return 0;
  517. }
  518. struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
  519. int response = 0;
  520. if (!req) {
  521. if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
  522. req = bpf_map_lookup_elem(&active_l7_requests, &k);
  523. if (!req) {
  524. return 0;
  525. }
  526. response = 1;
  527. } else if (looks_like_http2_frame(payload, ret, METHOD_HTTP2_SERVER_FRAMES)) {
  528. e->protocol = PROTOCOL_HTTP2;
  529. e->method = METHOD_HTTP2_SERVER_FRAMES;
  530. e->duration = bpf_ktime_get_ns();
  531. e->payload_size = ret;
  532. COPY_PAYLOAD(e->payload, ret, payload);
  533. send_event(ctx, e, k.pid, k.fd);
  534. return 0;
  535. } else {
  536. return 0;
  537. }
  538. }
  539. e->protocol = req->protocol;
  540. e->payload_size = req->payload_size;
  541. COPY_PAYLOAD(e->payload, req->payload_size, req->payload);
  542. bpf_map_delete_elem(&active_l7_requests, &k);
  543. if (e->protocol == PROTOCOL_HTTP) {
  544. __u64 trace_id = get_trace_id(pid, tid);
  545. e->trace_id = trace_id;
  546. cw_bpf_debug("[Response][HTTP222]:thread_id:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
  547. cw_bpf_debug("[Response][HTTP222] trace_id:%llu", trace_id);
  548. // 请求报文
  549. cw_bpf_debug("[Response][HTTP222] req-payload:%s",e->payload);
  550. // 响应报文
  551. cw_bpf_debug("[Response][HTTP222] resp-payload:%s",payload);
  552. response = is_http_response(payload, &e->status);
  553. } else if (e->protocol == PROTOCOL_POSTGRES) {
  554. response = is_postgres_response(payload, ret, &e->status);
  555. if (req->request_type == POSTGRES_FRAME_PARSE) {
  556. e->method = METHOD_STATEMENT_PREPARE;
  557. }
  558. } else if (e->protocol == PROTOCOL_REDIS) {
  559. cw_bpf_debug("[Response][Redis]:TGID:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
  560. __u64 trace_id = get_trace_id(pid, tid);
  561. cw_bpf_debug("[Redis] trace_id:%llu", trace_id);
  562. e->trace_id = trace_id;
  563. response = is_redis_response(payload, ret, &e->status);
  564. } else if (e->protocol == PROTOCOL_MEMCACHED) {
  565. response = is_memcached_response(payload, ret, &e->status);
  566. } else if (e->protocol == PROTOCOL_MYSQL) {
  567. cw_bpf_debug("[Response][Mysql]:thread_id:%d\n",tid);
  568. __u64 trace_id = get_trace_id(pid, tid);
  569. // cw_bpf_debug("[Mysql] trace_id:%llu", trace_id);
  570. e->trace_id = trace_id;
  571. response = is_mysql_response(payload, ret, req->request_type, &e->statement_id, &e->status);
  572. if (req->request_type == MYSQL_COM_STMT_PREPARE) {
  573. e->method = METHOD_STATEMENT_PREPARE;
  574. }
  575. } else if (e->protocol == PROTOCOL_MONGO) {
  576. response = is_mongo_response(payload, ret, req->partial);
  577. if (response == 2) { // partial
  578. struct l7_request *r = bpf_map_lookup_elem(&l7_request_heap, &zero);
  579. if (!r) {
  580. return 0;
  581. }
  582. r->partial = 1;
  583. r->protocol = e->protocol;
  584. r->ns = req->ns;
  585. r->payload_size = req->payload_size;
  586. COPY_PAYLOAD(r->payload, req->payload_size, req->payload);
  587. bpf_map_update_elem(&active_l7_requests, &k, r, BPF_ANY);
  588. return 0;
  589. }
  590. } else if (e->protocol == PROTOCOL_KAFKA) {
  591. response = is_kafka_response(payload, req->request_id);
  592. } else if (e->protocol == PROTOCOL_DUBBO2) {
  593. response = is_dubbo2_response(payload, &e->status);
  594. }
  595. if (!response) {
  596. return 0;
  597. }
  598. e->duration = bpf_ktime_get_ns() - req->ns;
  599. send_event(ctx, e, k.pid, k.fd);
  600. return 0;
  601. }
  602. SEC("tracepoint/syscalls/sys_enter_write")
  603. int sys_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx) {
  604. return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size, 0);
  605. }
  606. SEC("tracepoint/syscalls/sys_enter_writev")
  607. int sys_enter_writev(struct trace_event_raw_sys_enter_rw__stub* ctx) {
  608. return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, 0, ctx->size);
  609. }
  610. SEC("tracepoint/syscalls/sys_enter_sendmsg")
  611. int sys_enter_sendmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
  612. struct l7_user_msghdr msghdr = {};
  613. if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
  614. return 0;
  615. }
  616. return trace_enter_write(ctx, ctx->fd, 0, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
  617. }
  618. SEC("tracepoint/syscalls/sys_enter_sendto")
  619. int sys_enter_sendto(struct trace_event_raw_sys_enter_rw__stub* ctx) {
  620. return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size, 0);
  621. }
  622. SEC("tracepoint/syscalls/sys_enter_read")
  623. int sys_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
  624. __u64 id = bpf_get_current_pid_tgid();
  625. return trace_enter_read(id, ctx->fd, ctx->buf, 0, 0);
  626. }
  627. SEC("tracepoint/syscalls/sys_enter_readv")
  628. int sys_enter_readv(struct trace_event_raw_sys_enter_rw__stub* ctx) {
  629. __u64 id = bpf_get_current_pid_tgid();
  630. return trace_enter_read(id, ctx->fd, ctx->buf, 0, ctx->size);
  631. }
  632. SEC("tracepoint/syscalls/sys_enter_recvmsg")
  633. int sys_enter_recvmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
  634. __u64 id = bpf_get_current_pid_tgid();
  635. struct l7_user_msghdr msghdr = {};
  636. if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
  637. return 0;
  638. }
  639. return trace_enter_read(id, ctx->fd, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
  640. }
  641. SEC("tracepoint/syscalls/sys_enter_recvfrom")
  642. int sys_enter_recvfrom(struct trace_event_raw_sys_enter_rw__stub* ctx) {
  643. __u64 id = bpf_get_current_pid_tgid();
  644. return trace_enter_read(id, ctx->fd, ctx->buf, 0, 0);
  645. }
  646. SEC("tracepoint/syscalls/sys_exit_read")
  647. int sys_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
  648. __u64 pid_tgid = bpf_get_current_pid_tgid();
  649. __u32 pid = pid_tgid >> 32;
  650. return trace_exit_read(ctx, pid_tgid, pid, 0, ctx->ret);
  651. }
  652. SEC("tracepoint/syscalls/sys_exit_readv")
  653. int sys_exit_readv(struct trace_event_raw_sys_exit_rw__stub* ctx) {
  654. __u64 pid_tgid = bpf_get_current_pid_tgid();
  655. __u32 pid = pid_tgid >> 32;
  656. return trace_exit_read(ctx, pid_tgid, pid, 0, ctx->ret);
  657. }
  658. SEC("tracepoint/syscalls/sys_exit_recvmsg")
  659. int sys_exit_recvmsg(struct trace_event_raw_sys_exit_rw__stub* ctx) {
  660. __u64 pid_tgid = bpf_get_current_pid_tgid();
  661. __u32 pid = pid_tgid >> 32;
  662. return trace_exit_read(ctx, pid_tgid, pid, 0, ctx->ret);
  663. }
  664. SEC("tracepoint/syscalls/sys_exit_recvfrom")
  665. int sys_exit_recvfrom(struct trace_event_raw_sys_exit_rw__stub* ctx) {
  666. __u64 pid_tgid = bpf_get_current_pid_tgid();
  667. __u32 pid = pid_tgid >> 32;
  668. return trace_exit_read(ctx, pid_tgid, pid, 0, ctx->ret);
  669. }
  670. //
  671. //SEC("tracepoint/syscalls/sys_exit_recvfrom")
  672. //int sys_exit_recvfrom222(struct trace_event_raw_sys_exit_rw__stub* ctx) {
  673. // __u64 pid_tgid = bpf_get_current_pid_tgid();
  674. // __u32 pid = pid_tgid >> 32;
  675. // if (pid == filterPid) {
  676. // cw_bpf_debug("sys_exit_recvfrom222");
  677. // }
  678. // return 0;
  679. //}