grpc.server.probe.bpf.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. #include "arguments.h"
  4. #include "go_types.h"
  5. #include "go_net.h"
  6. #include "span_context.h"
  7. #include "go_context.h"
  8. #include "uprobe.h"
  9. // #include "trace/start_span.h"
  10. // char __license[] SEC("license") = "Dual MIT/GPL";
  11. // #define MAX_SIZE 100
  12. #define MAX_CONCURRENT 50
  13. #define MAX_HEADERS 20
  14. #define MAX_HEADER_STRING 50
  15. #define PROTOCOL_GRPC 16
  16. struct grpc_request_t {
  17. BASE_SPAN_PROPERTIES
  18. char method[MAX_SIZE];
  19. u32 status_code;
  20. net_addr_t local_addr;
  21. u8 has_status;
  22. u32 stream_id;
  23. u64 method_size;
  24. };
  25. struct {
  26. __uint(type, BPF_MAP_TYPE_HASH);
  27. __type(key, void *);
  28. __type(value, struct grpc_request_t);
  29. __uint(max_entries, MAX_CONCURRENT);
  30. } grpc_events SEC(".maps");
  31. struct {
  32. __uint(type, BPF_MAP_TYPE_HASH);
  33. __type(key, u32);
  34. __type(value, struct grpc_request_t);
  35. __uint(max_entries, MAX_CONCURRENT);
  36. } streamid_to_grpc_events SEC(".maps");
  37. struct {
  38. __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
  39. __uint(key_size, sizeof(u32));
  40. __uint(value_size, sizeof(struct grpc_request_t));
  41. __uint(max_entries, 1);
  42. } grpc_storage_map SEC(".maps");
  43. struct hpack_header_field {
  44. struct go_string_ot name;
  45. struct go_string_ot value;
  46. bool sensitive;
  47. };
  48. // Injected in init
  49. // volatile const u64 stream_method_ptr_pos;
  50. // u64 stream_method_ptr_pos = 80; //需要处理多版本场景
  51. // volatile const u64 frame_fields_pos;
  52. u64 frame_fields_pos = 8; //使用固定值8即可,不再处理多版本场景。
  53. // volatile const u64 frame_stream_id_pod;
  54. u64 frame_stream_id_pod = 8; //使用固定值8即可,不再处理多版本场景。
  55. // volatile const u64 stream_id_pos;
  56. u64 stream_id_pos = 0; //使用固定值0即可,不再处理多版本场景。
  57. // volatile const u64 stream_ctx_pos;
  58. // u64 stream_ctx_pos = 32; //需要做多版本处理
  59. // volatile const u64 server_stream_stream_pos;
  60. u64 server_stream_stream_pos = 0; //1.69之前版本用不到,1.69之后版本用这个
  61. // volatile const bool is_new_frame_pos;
  62. // bool is_new_frame_pos; // < 1.60 为false,>= 1.60 为true,直接在 用户态赋值即可,不再做处理。
  63. // volatile const u64 status_s_pos;
  64. static u64 status_s_pos = 0; //使用固定值即可,不再处理多版本场景。
  65. // volatile const u64 status_code_pos;
  66. static u64 status_code_pos = 40; //使用固定值即可,不再做多版本处理
  67. // volatile const u64 http2server_peer_pos;
  68. // u64 http2server_peer_pos;
  69. // volatile const u64 peer_local_addr_pos;
  70. // u64 peer_local_addr_pos;
  71. // volatile const bool server_addr_supported;
  72. static __always_inline long
  73. dummy_extract_span_context_from_headers(void *stream_id, struct span_context *parent_span_context) {
  74. return 0;
  75. }
  76. // handleStream handles gRPC stream telemetry.
  77. //
  78. // Arguments:
  79. // - ctx: the pt_regs passed to the uprobe function
  80. // - stream_ptr: pointer to the transport.Stream tracking the stream
  81. // - go_context: the parsed Go context.Context
  82. //
  83. // Returns 0 on success, otherwise a negative error value in case of failure.
  84. static __always_inline int
  85. handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context) {
  86. __u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
  87. struct ebpf_proc_info *info =
  88. bpf_map_lookup_elem(&proc_info_map, &tgid);
  89. if (!info) {
  90. return -1;
  91. }
  92. if (go_context == NULL) {
  93. cw_bpf_debug("grpc:server:handleStream: NULL go_context");
  94. return -1;
  95. }
  96. // cw_bpf_debug("info->stream_method_ptr_pos is %d\n", info->stream_method_ptr_pos);
  97. if (stream_ptr == NULL) {
  98. cw_bpf_debug("grpc:server:handleStream: NULL stream_ptr");
  99. return -1;
  100. }
  101. void *key = (void *)GOROUTINE(ctx);
  102. void *grpcReq_event_ptr = bpf_map_lookup_elem(&grpc_events, &key);
  103. if (grpcReq_event_ptr != NULL) {
  104. cw_bpf_debug("grpc:server:handleStream: event already tracked");
  105. return 0;
  106. }
  107. // Get parent context if exists
  108. u32 stream_id = 0;
  109. __u32 zero = 0;
  110. long rc =
  111. bpf_probe_read_user(&stream_id, sizeof(stream_id), (void *)(stream_ptr + stream_id_pos));
  112. if (rc != 0) {
  113. cw_bpf_debug("grpc:server:handleStream: failed to read stream ID");
  114. return -2;
  115. }
  116. struct grpc_request_t *grpcReq = bpf_map_lookup_elem(&streamid_to_grpc_events, &stream_id);
  117. if (grpcReq == NULL) {
  118. // No parent span context, generate new span context
  119. u32 zero = 0;
  120. grpcReq = bpf_map_lookup_elem(&grpc_storage_map, &zero);
  121. if (grpcReq == NULL) {
  122. cw_bpf_debug("grpc:server:handleStream: failed to get grpcReq");
  123. return 0;
  124. }
  125. }
  126. grpcReq->start_time = bpf_ktime_get_ns();
  127. grpcReq->stream_id = stream_id;
  128. // start_span_params_t start_span_params = {
  129. // .ctx = ctx,
  130. // .sc = &grpcReq->sc,
  131. // .psc = &grpcReq->psc,
  132. // .go_context = go_context,
  133. // // The parent span context is set by operateHeader probe
  134. // .get_parent_span_context_fn = dummy_extract_span_context_from_headers,
  135. // .get_parent_span_context_arg = NULL,
  136. // };
  137. // start_span(&start_span_params);
  138. // Set attributes
  139. void *method_ptr = stream_ptr + info->stream_method_ptr_pos;
  140. bool parsed_method =
  141. get_go_string_from_user_ptr(method_ptr, grpcReq->method, sizeof(grpcReq->method));
  142. if (!parsed_method) {
  143. cw_bpf_debug("grpc:server:handleStream: failed to read gRPC method from stream");
  144. bpf_map_delete_elem(&streamid_to_grpc_events, &stream_id);
  145. return -3;
  146. }
  147. // cw_bpf_debug("grpc:server:handleStream: get the method is %s\n", grpcReq->method);
  148. // if (server_addr_supported) {
  149. // void *http2server = get_argument(ctx, 3);
  150. // if (http2server != NULL) {
  151. // void *local_addr_ptr = 0;
  152. // void *local_addr_pos = http2server + http2server_peer_pos + peer_local_addr_pos;
  153. // bpf_probe_read_user(
  154. // &local_addr_ptr, sizeof(local_addr_ptr), get_go_interface_instance(local_addr_pos));
  155. // get_tcp_net_addr_from_tcp_addr(ctx, &grpcReq->local_addr, (void *)(local_addr_ptr));
  156. // } else {
  157. // cw_bpf_debug("grpc:server:handleStream: failed to get http2server arg");
  158. // }
  159. // }
  160. // Write event
  161. rc = bpf_map_update_elem(&grpc_events, &key, grpcReq, 0);
  162. if (rc != 0) {
  163. cw_bpf_debug("grpc:server:handleStream: failed to update event");
  164. return -4;
  165. }
  166. start_tracking_span(go_context->data, &grpcReq->sc);
  167. //处理http请求之前,确认进程信息是否存在
  168. __u64 id = bpf_get_current_pid_tgid();
  169. __u32 pid = id >> 32;
  170. struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &pid);
  171. if (!proc_info) {
  172. cw_bpf_debug("[Trace End in l7][Response][HTTP]:no proc info. pid:%d \n",pid);
  173. return 0;
  174. }
  175. // cw_bpf_debug("start get apm data\n");
  176. // struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&apm_span_context_heap3, &zero);
  177. // if (cw_parent_span_context == NULL) {
  178. // return -1;
  179. // }
  180. // __builtin_memset(cw_parent_span_context, 0, sizeof(struct apm_span_context));
  181. // generate_random_bytes(cw_parent_span_context->trace_id, TRACE_ID_SIZE);
  182. // cw_save_parent_tracking_span(cw_parent_span_context);
  183. struct l7_request_key k = {};
  184. k.pid = pid;
  185. k.fd = 0;
  186. k.is_tls = 0;
  187. k.stream_id = stream_id;
  188. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  189. if (!e) {
  190. return 0;
  191. }
  192. e->fd = k.fd;
  193. e->pid = k.pid;
  194. e->status = STATUS_UNKNOWN;
  195. e->method = METHOD_UNKNOWN;
  196. e->statement_id = 0;
  197. // 拷贝 grpcReq->method 到 payload 并设置 payload_size
  198. // 手动计算字符串长度(在 eBPF 中不能使用 strlen)
  199. u64 method_len = 0;
  200. for (int i = 0; i < MAX_SIZE; i++) {
  201. if (grpcReq->method[i] == '\0') {
  202. method_len = i;
  203. break;
  204. }
  205. }
  206. // 如果没有找到 '\0',使用最大长度
  207. if (method_len == 0) {
  208. method_len = MAX_SIZE - 1;
  209. }
  210. grpcReq->method_size = method_len;
  211. e->payload_size = method_len;
  212. COPY_PAYLOAD(e->payload, method_len, grpcReq->method);
  213. // cw_bpf_debug("grpc:server:handleStream: get the payload size is %d\n", e->payload_size);
  214. struct apm_trace_info_t trace_info = cw_save_trace_info(id,pid, k.fd);
  215. e->trace_start = 1;
  216. e->trace_end = 0;
  217. e->trace_type = 1;
  218. e->protocol = PROTOCOL_TRACE;
  219. e->trace_id = trace_info.trace_id;
  220. //不发送payload
  221. bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
  222. return 0;
  223. }
  224. // writeStatus writes the OTel status to any active spans.
  225. //
  226. // Arguments:
  227. // - ctx: the pt_regs passed to the uprobe function
  228. // - status_ptr: pointer to the status.Stream holding the status info
  229. //
  230. // Returns 0 on success, otherwise a negative error value in case of failure.
  231. // static __always_inline int writeStatus(struct pt_regs *ctx, void *status_ptr) {
  232. // if (status_ptr == NULL) {
  233. // cw_bpf_debug("grpc:server:writeStatus: NULL status_ptr");
  234. // return -1;
  235. // }
  236. // void *key = (void *)GOROUTINE(ctx);
  237. // struct grpc_request_t *req_ptr = bpf_map_lookup_elem(&grpc_events, &key);
  238. // if (req_ptr == NULL) {
  239. // cw_bpf_debug("grpc:server:handleStream: failed to lookup grpc request");
  240. // return -2;
  241. // }
  242. // void *s_ptr = 0;
  243. // long rc = bpf_probe_read_user(&s_ptr, sizeof(s_ptr), (void *)(status_ptr + status_s_pos));
  244. // if (rc != 0) {
  245. // cw_bpf_debug("grpc:server:handleStream: failed to read Status.s");
  246. // return -3;
  247. // }
  248. // // Get status code from Status.s pointer
  249. // rc = bpf_probe_read_user(
  250. // &req_ptr->status_code, sizeof(req_ptr->status_code), (void *)(s_ptr + status_code_pos));
  251. // if (rc != 0) {
  252. // cw_bpf_debug("grpc:server:handleStream: failed to read status code");
  253. // return -4;
  254. // }
  255. // req_ptr->has_status = true;
  256. // return 0;
  257. // }
  258. // This instrumentation attaches uprobe to the following function:
  259. // func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo)
  260. //
  261. // This is only compatible with versions < 1.69.0 of the Server.
  262. SEC("uprobe/server_handleStream")
  263. int uprobe_server_handleStream(struct pt_regs *ctx) {
  264. // cw_bpf_debug("enter the uprobe_server_handleStream");
  265. u64 stream_pos = 4;
  266. void *stream_ptr = get_argument(ctx, stream_pos);
  267. // cw_bpf_debug("enter uprobe_server_handleStream\n");
  268. // Get key
  269. __u64 pid_tgid = bpf_get_current_pid_tgid();
  270. __u32 tgid = pid_tgid >> 32;
  271. struct ebpf_proc_info *proc_info =
  272. bpf_map_lookup_elem(&proc_info_map, &tgid);
  273. if(!proc_info)
  274. {
  275. cw_bpf_debug("[uprobe_HandlerFunc_ServeHTTP] no proc info");
  276. return 0;
  277. }
  278. struct go_iface go_context = {0};
  279. get_Go_context(ctx, stream_pos, proc_info->ctx_ptr_pos, false);
  280. return handleStream(ctx, stream_ptr, &go_context);
  281. }
  282. // UPROBE_RETURN(server_handleStream, struct grpc_request_t, grpc_events)
  283. SEC("uprobe/server_handleStream")
  284. int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {
  285. // cw_bpf_debug("enter the uprobe_server_handleStream return");
  286. void *key = (void *)GOROUTINE(ctx);
  287. __u64 id = bpf_get_current_pid_tgid();
  288. __u32 zero = 0;
  289. __u32 fd = 0;
  290. __u32 pid, tid;
  291. __u32 http_status = 200;
  292. pid = id >> 32;
  293. tid = (__u32)id;
  294. // cw_bpf_debug("enter uprobe_server_handleStream_Returns\n");
  295. struct l7_request_key k = {};
  296. k.pid = pid;
  297. k.fd = fd;
  298. k.is_tls = 0;
  299. k.stream_id = -1;
  300. struct grpc_request_t *event = bpf_map_lookup_elem(&grpc_events, &key);
  301. if (event == NULL) {
  302. cw_bpf_debug("grpc:server:uprobe/server_handleStream2Return: event is NULL");
  303. return -5;
  304. }
  305. event->end_time = bpf_ktime_get_ns();
  306. // output_span_event(ctx, event, sizeof(struct grpc_request_t), &event->sc);
  307. stop_tracking_span(&event->sc, &event->psc);
  308. bpf_map_delete_elem(&grpc_events, &key);
  309. struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
  310. struct apm_trace_info_t * start_trace_info = get_apm_trace_info_by_trace_key(trace_key);
  311. if (!start_trace_info) {
  312. return -1;
  313. }
  314. __u64 trace_id = start_trace_info->trace_id;
  315. __u32 event_count = cw_get_event_count(trace_id);
  316. cw_bpf_debug("[uprobeThread/pidpidpidpid][Trace End in l7][HTTP]pid:[%d]--[%lld]", tid, bpf_ktime_get_ns());
  317. cw_bpf_debug("[Trace End in l7][Response][HTTP] event_count:%d", event_count);
  318. cw_bpf_debug("[Trace End in l7][Response][HTTP] pid:%d,fd:%d,trace_id:%llu", tid, fd, trace_id);
  319. // 发送事件到用户空间 start
  320. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  321. if (!e) {
  322. cw_clear_trace(pid, tid, fd);
  323. return -1;
  324. }
  325. // parent sc
  326. struct apm_span_context *cw_psc = cw_get_parent_tracking_span_by_trace_key(start_trace_info->trace_key);
  327. if(cw_psc){
  328. cw_copy_byte_arrays(cw_psc->trace_id, e->trace_id_from, APM_TRACE_ID_SIZE);
  329. cw_copy_byte_arrays(cw_psc->assumed_app_id, e->called_id, APM_ASSUMED_APP_ID_SIZE);
  330. cw_copy_byte_arrays(cw_psc->instance_id, e->instance_id_from, APM_INSTANCE_ID_SIZE);
  331. cw_copy_byte_arrays(cw_psc->app_id, e->app_id_from, APM_APP_ID_SIZE);
  332. cw_copy_byte_arrays(cw_psc->span_id, e->span_id_from, APM_SPAN_ID_SIZE);
  333. cw_copy_byte_arrays(cw_psc->type_from, e->type_from, APM_TYPE_FROM_SIZE);
  334. }
  335. // struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
  336. // if (!req)
  337. // {
  338. // cw_clear_trace(pid, tid, fd);
  339. // return 0;
  340. // }
  341. // e->start_at = req->ns;
  342. e->start_at = event->start_time;
  343. // cw_bpf_debug("req->ns:%llu",req->ns);
  344. e->end_at = bpf_ktime_get_ns();
  345. e->duration = e->end_at - e->start_at;
  346. e->protocol = PROTOCOL_TRACE;
  347. e->status = http_status;
  348. e->pid = k.pid;
  349. e->fd = k.fd;
  350. // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
  351. e->trace_start = 0;
  352. e->trace_end = 1;
  353. e->trace_type = 1;
  354. e->trace_id = trace_id;
  355. e->payload_size = 0;
  356. e->event_count = event_count;
  357. e->payload_size = event->method_size;
  358. COPY_PAYLOAD(e->payload, event->method_size, event->method);
  359. // bpf_map_delete_elem(&active_l7_requests, &k);
  360. // 清除事件计数
  361. bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
  362. // 清除业务层trace信息
  363. clear_parent_span_context_by_trace_key(start_trace_info->trace_key);
  364. // 清除trace信息
  365. cw_clear_trace(pid, tid, fd);
  366. // cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
  367. // struct accept_connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
  368. // if (accept_conn) {
  369. // cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
  370. // cw_bpf_debug("rock enter the accept_conn function cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
  371. // e->sport = accept_conn->sport;
  372. // e->dport = accept_conn->dport;
  373. // __builtin_memcpy(&e->saddr, &accept_conn->saddr, sizeof(e->saddr));
  374. // __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
  375. // }
  376. //不发送payload
  377. bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
  378. // cw_bpf_debug("stop get apm data\n");
  379. return 0;
  380. }
  381. // This instrumentation attaches uprobe to the following function:
  382. // func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream)
  383. // https://github.com/grpc/grpc-go/blob/317271b232677b7869576a49855b01b9f4775d67/server.go#L1735
  384. //
  385. // This is only compatible with versions > 1.69.0 of the Server.
  386. SEC("uprobe/server_handleStream2")
  387. int uprobe_server_handleStream2(struct pt_regs *ctx) {
  388. __u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
  389. struct ebpf_proc_info *info =
  390. bpf_map_lookup_elem(&proc_info_map, &tgid);
  391. if (!info) {
  392. return -1;
  393. }
  394. cw_bpf_debug("info->stream_ctx_pos is %d\n", info->stream_ctx_pos);
  395. u64 server_stream_pos = 4;
  396. // cw_bpf_debug("enter uprobe_server_handleStream2\n");
  397. void *server_stream_ptr = get_argument(ctx, server_stream_pos);
  398. if (server_stream_ptr == NULL) {
  399. cw_bpf_debug("grpc:server:uprobe/server_handleStream2: failed to get ServerStream arg");
  400. return -1;
  401. }
  402. void *stream_ptr;
  403. long rc = bpf_probe_read_user(
  404. &stream_ptr, sizeof(stream_ptr), (void *)(server_stream_ptr + server_stream_stream_pos));
  405. if (rc != 0) {
  406. cw_bpf_debug("grpc:server:uprobe/server_handleStream2: failed to read stream_ptr");
  407. return -2;
  408. }
  409. struct go_iface go_context = {0};
  410. rc = bpf_probe_read_user(
  411. &go_context.type, sizeof(go_context.type), (void *)(stream_ptr + info->stream_ctx_pos));
  412. if (rc != 0) {
  413. cw_bpf_debug("grpc:server:uprobe/server_handleStream2: failed to read context type");
  414. return -3;
  415. }
  416. rc = bpf_probe_read_user(&go_context.data,
  417. sizeof(go_context.data),
  418. get_go_interface_instance(stream_ptr + info->stream_ctx_pos));
  419. if (rc != 0) {
  420. cw_bpf_debug("grpc:server:uprobe/server_handleStream2: failed to read context data");
  421. return -4;
  422. }
  423. return handleStream(ctx, stream_ptr, &go_context);
  424. }
  425. // This instrumentation attaches a return uprobe to the following function:
  426. // func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream)
  427. // https://github.com/grpc/grpc-go/blob/317271b232677b7869576a49855b01b9f4775d67/server.go#L1735
  428. //
  429. // This is only compatible with versions > 1.69.0 of the Server.
  430. SEC("uprobe/server_handleStream2")
  431. int uprobe_server_handleStream2_Returns(struct pt_regs *ctx) {
  432. u64 server_stream_pos = 4;
  433. void *server_stream_ptr = get_argument(ctx, server_stream_pos);
  434. void *key = NULL;
  435. if (server_stream_ptr == NULL) {
  436. // We might fail to get the pointer for versions of Go which use register ABI, as this function does not return anything.
  437. // This is not an error in that case so we can just go to the lookup which will happen by goroutine.
  438. goto lookup;
  439. }
  440. void *stream_ptr;
  441. long rc = bpf_probe_read_user(
  442. &stream_ptr, sizeof(stream_ptr), (void *)(server_stream_ptr + server_stream_stream_pos));
  443. if (rc != 0) {
  444. cw_bpf_debug("grpc:server:uprobe/server_handleStream2Return: failed to read stream_ptr");
  445. return -2;
  446. }
  447. lookup:
  448. key = (void *)GOROUTINE(ctx);
  449. struct grpc_request_t *event = bpf_map_lookup_elem(&grpc_events, &key);
  450. if (event == NULL) {
  451. cw_bpf_debug("grpc:server:uprobe/server_handleStream2Return: event is NULL");
  452. return -5;
  453. }
  454. event->end_time = bpf_ktime_get_ns();
  455. // output_span_event(ctx, event, sizeof(struct grpc_request_t), &event->sc);
  456. stop_tracking_span(&event->sc, &event->psc);
  457. bpf_map_delete_elem(&grpc_events, &key);
  458. __u64 id = bpf_get_current_pid_tgid();
  459. __u32 zero = 0;
  460. __u32 fd = 0;
  461. __u32 pid, tid;
  462. __u32 http_status = 200;
  463. pid = id >> 32;
  464. tid = (__u32)id;
  465. // cw_bpf_debug("enter uprobe_server_handleStream_Returns\n");
  466. struct l7_request_key k = {};
  467. k.pid = pid;
  468. k.fd = fd;
  469. k.is_tls = 0;
  470. k.stream_id = -1;
  471. struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
  472. struct apm_trace_info_t * start_trace_info = get_apm_trace_info_by_trace_key(trace_key);
  473. if (!start_trace_info) {
  474. return -1;
  475. }
  476. __u64 trace_id = start_trace_info->trace_id;
  477. __u32 event_count = cw_get_event_count(trace_id);
  478. cw_bpf_debug("[uprobeThread/pidpidpidpid][Trace End in l7][HTTP]pid:[%d]--[%lld]", tid, bpf_ktime_get_ns());
  479. cw_bpf_debug("[Trace End in l7][Response][HTTP] event_count:%d", event_count);
  480. cw_bpf_debug("[Trace End in l7][Response][HTTP] pid:%d,fd:%d,trace_id:%llu", tid, fd, trace_id);
  481. // 发送事件到用户空间 start
  482. struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
  483. if (!e) {
  484. cw_clear_trace(pid, tid, fd);
  485. return -1;
  486. }
  487. // parent sc
  488. struct apm_span_context *cw_psc = cw_get_parent_tracking_span_by_trace_key(start_trace_info->trace_key);
  489. if(cw_psc){
  490. cw_copy_byte_arrays(cw_psc->trace_id, e->trace_id_from, APM_TRACE_ID_SIZE);
  491. cw_copy_byte_arrays(cw_psc->assumed_app_id, e->called_id, APM_ASSUMED_APP_ID_SIZE);
  492. cw_copy_byte_arrays(cw_psc->instance_id, e->instance_id_from, APM_INSTANCE_ID_SIZE);
  493. cw_copy_byte_arrays(cw_psc->app_id, e->app_id_from, APM_APP_ID_SIZE);
  494. cw_copy_byte_arrays(cw_psc->span_id, e->span_id_from, APM_SPAN_ID_SIZE);
  495. cw_copy_byte_arrays(cw_psc->type_from, e->type_from, APM_TYPE_FROM_SIZE);
  496. }
  497. // struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
  498. // if (!req)
  499. // {
  500. // cw_clear_trace(pid, tid, fd);
  501. // return 0;
  502. // }
  503. // e->start_at = req->ns;
  504. e->start_at = event->start_time;
  505. // cw_bpf_debug("req->ns:%llu",req->ns);
  506. e->end_at = bpf_ktime_get_ns();
  507. e->duration = e->end_at - e->start_at;
  508. e->protocol = PROTOCOL_TRACE;
  509. e->status = http_status;
  510. e->pid = k.pid;
  511. e->fd = k.fd;
  512. // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
  513. e->trace_start = 0;
  514. e->trace_end = 1;
  515. e->trace_type = 1;
  516. e->trace_id = trace_id;
  517. e->payload_size = 0;
  518. e->event_count = event_count;
  519. e->payload_size = event->method_size;
  520. COPY_PAYLOAD(e->payload, event->method_size, event->method);
  521. // bpf_map_delete_elem(&active_l7_requests, &k);
  522. // 清除事件计数
  523. bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
  524. // 清除业务层trace信息
  525. clear_parent_span_context_by_trace_key(start_trace_info->trace_key);
  526. // 清除trace信息
  527. cw_clear_trace(pid, tid, fd);
  528. // cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
  529. // struct accept_connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
  530. // if (accept_conn) {
  531. // cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
  532. // cw_bpf_debug("rock enter the accept_conn function cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
  533. // e->sport = accept_conn->sport;
  534. // e->dport = accept_conn->dport;
  535. // __builtin_memcpy(&e->saddr, &accept_conn->saddr, sizeof(e->saddr));
  536. // __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
  537. // }
  538. //不发送payload
  539. bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
  540. // cw_bpf_debug("stop get apm data\n");
  541. return 0;
  542. }
  543. // func (d *http2Server) operateHeader(frame *http2.MetaHeadersFrame) error
  544. // for version 1.60 and above:
  545. // func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error
  546. SEC("uprobe/http2Server_operateHeader")
  547. int uprobe_http2Server_operateHeader(struct pt_regs *ctx) {
  548. // cw_bpf_debug("enter the uprobe_http2Server_operateHeader");
  549. __u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
  550. struct ebpf_proc_info *info =
  551. bpf_map_lookup_elem(&proc_info_map, &tgid);
  552. if (!info) {
  553. return -1;
  554. }
  555. s32 find_w3c = 0;
  556. void *arg4 = get_argument(ctx, 4);
  557. void *arg2 = get_argument(ctx, 2);
  558. void *frame_ptr = info->is_new_frame_pos ? arg4 : arg2;
  559. struct go_slice header_fields = {};
  560. bpf_probe_read(&header_fields, sizeof(header_fields), (void *)(frame_ptr + frame_fields_pos));
  561. char key[CW_HEADER_KEY_LENGTH + 1] = "cwtrace";
  562. // 确保字符串以 null 结尾
  563. key[CW_HEADER_KEY_LENGTH] = '\0';
  564. // cw_bpf_debug("enter the uprobe_http2Server_operateHeader\n");
  565. __u32 zero = 0;
  566. struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&apm_span_context_heap3, &zero);
  567. if (cw_parent_span_context == NULL) {
  568. return -1;
  569. }
  570. __builtin_memset(cw_parent_span_context, 0, sizeof(struct apm_span_context));
  571. // 优化循环:减少复杂度,提前退出
  572. for (s32 i = 0; i < MAX_HEADERS && i < header_fields.len; i++) {
  573. struct hpack_header_field hf = {};
  574. long res = bpf_probe_read(&hf, sizeof(hf), (void *)(header_fields.ptr + (i * sizeof(hf))));
  575. if (res != 0) {
  576. continue; // 读取失败,跳过
  577. }
  578. // 简化条件判断
  579. if (hf.name.len != CW_HEADER_KEY_LENGTH || hf.value.len != CW_HEADER_VAL_LENGTH) {
  580. continue;
  581. }
  582. // cw_bpf_debug("found traceparent header name is %s", hf.name.str);
  583. // cw_bpf_debug("found traceparent header value is %s", hf.value.str);
  584. char current_key[CW_HEADER_KEY_LENGTH + 1];
  585. if (bpf_probe_read(current_key, sizeof(current_key), hf.name.str) != 0) {
  586. continue;
  587. }
  588. // cw_bpf_debug("---found traceparent header name is %s", hf.name.str);
  589. // cw_bpf_debug("+++found traceparent header value is %s", hf.value.str);
  590. current_key[CW_HEADER_KEY_LENGTH] = '\0';
  591. // cw_bpf_debug("---11111found cwtrace key is %s", key);
  592. // cw_bpf_debug("+++11111found cwtrace current_key is %s", current_key);
  593. // 简化字符串比较
  594. // if (bpf_memcmp(key, current_key, 6) == 0) {
  595. if (current_key[0] == 'c' && current_key[1] == 'w' && current_key[2] == 't' && current_key[3] == 'r' && current_key[4] == 'a' && current_key[5] == 'c' && current_key[6] == 'e') {
  596. find_w3c = 1;
  597. // cw_bpf_debug("found traceparent header");
  598. // 执行字符串到span context的转换
  599. cw_string_to_span_context(hf.value.str, cw_parent_span_context);
  600. // cw_bpf_debug("11111found traceparent header value is %s", hf.value.str);
  601. break; // 找到后立即退出
  602. }
  603. }
  604. if (find_w3c == 0)
  605. {
  606. generate_random_bytes(cw_parent_span_context->trace_id, TRACE_ID_SIZE);
  607. cw_bpf_debug("enter uprobe_http2Server_operateHeader, generate the traceid\n");
  608. }
  609. cw_save_parent_tracking_span(cw_parent_span_context);
  610. return 0;
  611. }
  612. // func (ht *http2Server) WriteStatus(s *Stream, st *status.Status)
  613. // https://github.com/grpc/grpc-go/blob/bcf9171a20e44ed81a6eb152e3ca9e35b2c02c5d/internal/transport/http2_server.go#L1049
  614. //
  615. // This is only compatible with versions > 1.40 and < 1.69.0 of the Server.
  616. // SEC("uprobe/http2Server_WriteStatus")
  617. // int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
  618. // // cw_bpf_debug("enter uprobe_http2Server_WriteStatus\n");
  619. // void *status_ptr = get_argument(ctx, 3);
  620. // return writeStatus(ctx, status_ptr);
  621. // }
  622. // // func (ht *http2Server) writeStatus(s *Stream, st *status.Status)
  623. // // https://github.com/grpc/grpc-go/blob/317271b232677b7869576a49855b01b9f4775d67/internal/transport/http2_server.go#L1045
  624. // //
  625. // // This is only compatible with versions > 1.69.0 of the Server.
  626. // SEC("uprobe/http2Server_WriteStatus2")
  627. // int uprobe_http2Server_WriteStatus2(struct pt_regs *ctx) {
  628. // // cw_bpf_debug("enter uprobe_http2Server_WriteStatus2\n");
  629. // u64 server_stream_pos = 2;
  630. // void *server_stream_ptr = get_argument(ctx, server_stream_pos);
  631. // if (server_stream_ptr == NULL) {
  632. // cw_bpf_debug("grpc:server:uprobe/http2Server_WriteStatus2: failed to get ServerStream arg");
  633. // return -1;
  634. // }
  635. // void *stream_ptr;
  636. // long rc = bpf_probe_read_user(
  637. // &stream_ptr, sizeof(stream_ptr), (void *)(server_stream_ptr + server_stream_stream_pos));
  638. // if (rc != 0) {
  639. // cw_bpf_debug("grpc:server:uprobe/http2Server_WriteStatus2: failed to read stream_ptr");
  640. // return -2;
  641. // }
  642. // void *status_ptr = get_argument(ctx, 3);
  643. // return writeStatus(ctx, status_ptr);
  644. // }