| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- #include "arguments.h"
- #include "span_context.h"
- #include "go_types.h"
- #include "go_context.h"
- #include "uprobe.h"
- // #define MAX_SIZE 50
- #define MAX_CONCURRENT 50
- #define MAX_ERROR_LEN 128
- // #define PROTOCOL_GRPC 15
- struct grpc_client_request_t {
- BASE_SPAN_PROPERTIES
- char err_msg[MAX_ERROR_LEN];
- char method[MAX_SIZE];
- char target[MAX_SIZE];
- u32 status_code;
- u64 method_size;
- u64 target_size;
- struct apm_span_context apm_sc;
- struct apm_span_context apm_psc;
- };
- // struct hpack_header_field {
- // struct go_string_ot name;
- // struct go_string_ot value;
- // bool sensitive;
- // };
- struct {
- __uint(type, BPF_MAP_TYPE_HASH);
- __type(key, void *);
- __type(value, struct grpc_client_request_t);
- __uint(max_entries, MAX_CONCURRENT);
- } grpc_client_events SEC(".maps");
- struct {
- __uint(type, BPF_MAP_TYPE_HASH);
- __type(key, u32);
- __type(value, struct apm_trace_key_t);
- __uint(max_entries, MAX_CONCURRENT);
- } streamid_to_span_contexts SEC(".maps");
- // 用于 ClientConn_Invoke 函数的临时存储,避免栈空间超限
- struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __uint(key_size, sizeof(u32));
- __uint(value_size, sizeof(struct grpc_client_request_t));
- __uint(max_entries, 1);
- } grpc_client_storage_map SEC(".maps");
- // 用于 loopyWriter_headerHandler 的临时存储
- struct header_handler_storage {
- struct span_context sc;
- char val[CW_HEADER_VAL_LENGTH];
- struct hpack_header_field hf;
- };
- struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __uint(key_size, sizeof(u32));
- __uint(value_size, sizeof(struct header_handler_storage));
- __uint(max_entries, 1);
- } header_handler_storage_map SEC(".maps");
- // Injected in init
- // volatile const u64 clientconn_target_ptr_pos;
- u64 clientconn_target_ptr_pos = 24; //使用固定值24即可,不再处理多版本场景。
- // volatile const u64 httpclient_nextid_pos;
- // u64 httpclient_nextid_pos = 404; //处理多版本,通过变量获取吧
- // volatile const u64 headerFrame_streamid_pos;
- u64 headerFrame_streamid_pos = 0; //使用固定值0即可,不再处理多版本场景。
- // volatile const u64 headerFrame_hf_pos;
- u64 headerFrame_hf_pos = 8; //使用固定值8即可,不再处理多版本场景。
- // volatile const u64 error_status_pos;
- u64 error_status_pos = 0; //使用固定值0即可,不再处理多版本场景。
- // volatile const u64 status_s_pos;
- // static u64 status_s_pos = 0;
- // volatile const u64 status_message_pos;
- u64 status_message_pos = 48; //使用固定值48即可,不再处理多版本场景。
- // volatile const u64 status_code_pos;
- // static u64 status_code_pos = 40;
- // volatile const bool write_status_supported;
- // This instrumentation attaches uprobe to the following function:
- // func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
- SEC("uprobe/ClientConn_Invoke")
- int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
- // cw_bpf_debug("enter the uprobe_ClientConn_Invoke \n");
- // positions
- u64 clientconn_pos = 1;
- u64 method_ptr_pos = 4;
- u64 method_len_pos = 5;
- // struct go_iface go_context = {0};
- // get_Go_context(ctx, 2, 0, true);
- void *context_ptr_val = get_Go_context(ctx, 2, 0, true);
- if (context_ptr_val == NULL)
- {
- return 0;
- }
- // Get key
- void *key = (void *)GOROUTINE(ctx);
- // cw_bpf_debug("enter the uprobe_ClientConn_Invoke key is 0x%llx\n", (u64)key);
- void *grpcReq_ptr = bpf_map_lookup_elem(&grpc_client_events, &key);
- if (grpcReq_ptr != NULL) {
- cw_bpf_debug("uprobe/ClientConn_Invoke already tracked with the current context");
- return 0;
- }
- // 使用 per-cpu array map 存储大变量,避免栈空间超限
- u32 zero = 0;
- struct grpc_client_request_t *grpcReq = bpf_map_lookup_elem(&grpc_client_storage_map, &zero);
- if (grpcReq == NULL) {
- cw_bpf_debug("grpc:client:ClientConn_Invoke: failed to get storage");
- return -1;
- }
-
- // 清零并初始化
- __builtin_memset(grpcReq, 0, sizeof(struct grpc_client_request_t));
- grpcReq->start_time = bpf_ktime_get_ns();
- // Read Method
- void *method_ptr = get_argument(ctx, method_ptr_pos);
- u64 method_len = (u64)get_argument(ctx, method_len_pos);
- u64 method_size = sizeof(grpcReq->method);
- method_size = method_size < method_len ? method_size : method_len;
- bpf_probe_read(&grpcReq->method, method_size, method_ptr);
- grpcReq->method_size = method_size;
- // Read ClientConn.Target
- void *clientconn_ptr = get_argument(ctx, clientconn_pos);
- if (!get_go_string_from_user_ptr((void *)(clientconn_ptr + clientconn_target_ptr_pos),
- grpcReq->target,
- sizeof(grpcReq->target))) {
- cw_bpf_debug("target write failed, aborting ebpf probe");
- return 0;
- }
- grpcReq->target_size = sizeof(grpcReq->target);
- struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
- if(cw_psc){
- set_assumed_app_id_arrays(grpcReq->target, cw_psc->assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
- cw_save_parent_tracking_span(cw_psc);
- }
- // cw_bpf_debug("grpcReq->target is %s\n", grpcReq->target);
- // start_span_params_t start_span_params = {
- // .ctx = ctx,
- // .go_context = &go_context,
- // .psc = &grpcReq->psc,
- // .sc = &grpcReq->sc,
- // .get_parent_span_context_fn = NULL,
- // .get_parent_span_context_arg = NULL,
- // };
- // start_span(&start_span_params);
- // Write event
- bpf_map_update_elem(&grpc_client_events, &key, grpcReq, 0);
- start_tracking_span(context_ptr_val, &grpcReq->sc);
- // cw_bpf_debug("enter the uprobe_ClientConn_Invoke start_tracking_span\n");
- return 0;
- }
- // This instrumentation attaches uprobe to the following function:
- // func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
- SEC("uprobe/ClientConn_Invoke")
- int uprobe_ClientConn_Invoke_Returns(struct pt_regs *ctx) {
- // cw_bpf_debug("enter the uprobe_ClientConn_Invoke_Returns \n");
- void *key = (void *)GOROUTINE(ctx);
- struct grpc_client_request_t *grpc_span = bpf_map_lookup_elem(&grpc_client_events, &key);
- if (grpc_span == NULL) {
- cw_bpf_debug("event is NULL in ret probe");
- return 0;
- }
- // if (!write_status_supported) {
- // goto done;
- // }
- // Getting the returned response (error)
- // The status code is embedded 3 layers deep:
- // Invoke() error
- // the `error` interface concrete type here is a gRPC `internal.Error` struct
- // type Error struct {
- // s *Status
- // }
- // The `Error` struct embeds a `Status` proto object
- // type Status struct {
- // s *Status
- // }
- // The `Status` proto object contains a `Code` int32 field, which is what we want
- // type Status struct {
- // Code int32
- // Message string
- // Details []*anypb.Any
- // }
- void *resp_ptr = get_argument(ctx, 2);
- if (resp_ptr == 0) {
- // err == nil
- goto done;
- }
- void *status_ptr = 0;
- // get `s` (Status pointer field) from Error struct
- bpf_probe_read_user(&status_ptr, sizeof(status_ptr), (void *)(resp_ptr + error_status_pos));
- // get `s` field from Status object pointer
- void *s_ptr = 0;
- bpf_probe_read_user(&s_ptr, sizeof(s_ptr), (void *)(status_ptr + status_s_pos));
- // Get status code from Status.s pointer
- bpf_probe_read_user(
- &grpc_span->status_code, sizeof(grpc_span->status_code), (void *)(s_ptr + status_code_pos));
- get_go_string_from_user_ptr(
- (void *)(s_ptr + status_message_pos), grpc_span->err_msg, sizeof(grpc_span->err_msg));
- done:
- // cw_bpf_debug("switch to the done position\n");
- grpc_span->end_time = bpf_ktime_get_ns();
- // output_span_event(ctx, grpc_span, sizeof(*grpc_span), &grpc_span->sc);
- stop_tracking_span(&grpc_span->sc, &grpc_span->psc);
- bpf_map_delete_elem(&grpc_client_events, &key);
- __u64 pid_tgid = bpf_get_current_pid_tgid();
- __u64 pid = pid_tgid >> 32;
- struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &pid);
- if (!info) {
- return 0;
- }
- cw_bpf_debug("[Go] [uprobe/setNodeEnter]: proc_info_map::%ld, %d, %d\n", info->code_type);
- struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
- struct apm_trace_info_t * trace_info = get_apm_trace_info_by_trace_key(trace_key);
- if (trace_info == NULL) {
- cw_bpf_debug("enter the trace_info is NULL\n");
- trace_info = get_apm_trace_info_v3(trace_key,pid_tgid, pid, pid_tgid);
- }
-
- u32 zero = 0;
- struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
- if (!e) {
- return 0;
- }
- // e->fd = 0;
- // e->pid = pid;
- e->protocol = PROTOCOL_GRPC;
- e->trace_end = 0;
- e->trace_start = 0;
- // e->status = STATUS_UNKNOWN;
- // e->method = METHOD_GRPC;
- e->statement_id = 0;
- e->payload_size = grpc_span->method_size;
- if (trace_info) {
- // cw_bpf_debug("trace_info->trace_id is %llu\n", trace_info->trace_id);
- e->trace_id = trace_info->trace_id;
- }
- if(e->trace_id == 0){
- e->trace_id = get_apm_trace_id(pid,pid_tgid);
- cw_bpf_debug("e->trace_id is %llu\n", e->trace_id);
- }
- // 使用固定长度读取 target 信息,避免验证器复杂性
- if (grpc_span->target_size > 0) {
- // 使用固定的小长度来避免验证器问题
- u32 fixed_size = 32; // 固定32字节,足够大多数target
- if (grpc_span->target_size < fixed_size) {
- fixed_size = (u32)grpc_span->target_size;
- }
- bpf_probe_read(&e->rpc_target, fixed_size, grpc_span->target);
- }
- COPY_PAYLOAD(e->payload, grpc_span->method_size, grpc_span->method);
- // cw_bpf_debug("e->payload is %s\n", e->payload);
- // cw_bpf_debug("grpc_span->target is %s\n", grpc_span->target);
- // COPY_PAYLOAD(e->payload + grpc_span->method_size, grpc_span->target_size, grpc_span->target);
- // cw_bpf_debug("e->payload is %s\n", e->payload);
- // e->payload_size += grpc_span->target_size;
-
- struct apm_span_context * sc = cw_get_current_tracking_span(trace_info);
- if (sc) {
- cw_copy_byte_arrays(sc->assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
- cw_copy_byte_arrays(sc->span_id, e->span_id, APM_SPAN_ID_SIZE);
- }
-
- e->end_at = bpf_ktime_get_ns();
- e->start_at = grpc_span->start_time;
- e->duration = e->end_at - e->start_at;
- // cw_bpf_debug("send_event trace_id is444 %llu\n", e->trace_id);
- long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
- if (error ==0){
- cw_add_event_count(e->trace_id);
- }
- // cw_bpf_debug("enter the uprobe_ClientConn_Invoke_Returns done\n");
- return 0;
- }
- static __always_inline void
- cw_append_item_to_slice(void *new_item, u32 item_size, void *slice_user_ptr) {
- // read the slice descriptor
- struct go_slice_ot slice = {0};
- long res = bpf_probe_read_user(&slice, sizeof(slice), slice_user_ptr);
- if (res != 0) {
- cw_bpf_debug("cw_append_item_to_slice: failed to read slice descriptor, res=%ld\n", res);
- return;
- }
- // cw_bpf_debug("cw_append_item_to_slice len is %d\n", slice.len);
- // cw_bpf_debug("cw_append_item_to_slice cap is %d\n", slice.cap);
- // cw_bpf_debug("cw_append_item_to_slice array is %p\n", slice.array);
- u64 slice_len = slice.len;
- u64 slice_cap = slice.cap;
- if (slice_len < slice_cap && slice.array != NULL) {
- // Room available on current array, append to the underlying array
- // cw_bpf_debug("enter the cw_append_item_to_slice11111\n");
- res = bpf_probe_write_user(slice.array + (item_size * slice_len), new_item, item_size);
- } else {
- // No room on current array - try to copy new one of size item_size * (len + 1)
- u32 alloc_size = item_size * slice_len;
- if (alloc_size >= MAX_SLICE_ARRAY_SIZE) {
- return;
- }
- // Get temporary buffer
- u32 index = 0;
- struct slice_array_buff *map_buff = bpf_map_lookup_elem(&slice_array_buff_map, &index);
- if (!map_buff) {
- return;
- }
- unsigned char *new_slice_array = map_buff->buff;
- // help the verifier
- alloc_size &= (MAX_SLICE_ARRAY_SIZE - 1);
- if (alloc_size + item_size > MAX_SLICE_ARRAY_SIZE) {
- // No room for new item
- return;
- }
- // Append to buffer
- if (slice.array != NULL) {
- bpf_probe_read_user(new_slice_array, alloc_size, slice.array);
- // cw_bpf_debug("append_item_to_slice: copying %d bytes to new array from address 0x%llx",
- // alloc_size,
- // slice.array);
- }
- copy_byte_arrays(new_item, new_slice_array + alloc_size, item_size);
- // Copy buffer to userspace
- u32 new_array_size = alloc_size + item_size;
- void *new_array = write_target_data(new_slice_array, new_array_size);
- if (new_array == NULL) {
- cw_bpf_debug("append_item_to_slice: failed to copy new array to userspace");
- return;
- }
- // Update array pointer of slice
- slice.array = new_array;
- slice.cap++;
- // cw_bpf_debug("enter the cw_append_item_to_slice222222\n");
- }
- // Update len
- slice.len++;
- // cw_bpf_debug("after cw_append_item_to_slice len is %d\n", slice.len);
- long success = bpf_probe_write_user(slice_user_ptr, &slice, sizeof(slice));
- if (success != 0) {
- cw_bpf_debug("append_item_to_slice: failed to update slice in userspace");
- return;
- }
- }
- SEC("uprobe/loopyWriter_headerHandler")
- int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
- void *headerFrame_ptr = get_argument(ctx, 2);
- // cw_bpf_debug("enter the get header handler storage\n");
- __u64 pid_tgid = bpf_get_current_pid_tgid();
- __u32 tgid = pid_tgid >> 32;
- struct ebpf_proc_info *proc_info =
- bpf_map_lookup_elem(&proc_info_map, &tgid);
- if(!proc_info)
- {
- return 0;
- }
- u32 stream_id = 0;
- bpf_probe_read(
- &stream_id, sizeof(stream_id), (void *)(headerFrame_ptr + (headerFrame_streamid_pos)));
- struct apm_trace_key_t *sc_ptr = bpf_map_lookup_elem(&streamid_to_span_contexts, &stream_id);
- if (sc_ptr == NULL) {
- bpf_map_delete_elem(&streamid_to_span_contexts, &stream_id);
- return 0;
- }
-
- // Get storage from per-cpu map
- u32 zero = 0;
- struct header_handler_storage *storage = bpf_map_lookup_elem(&header_handler_storage_map, &zero);
- if (!storage) {
- bpf_map_delete_elem(&streamid_to_span_contexts, &stream_id);
- cw_bpf_debug("Failed to get header handler storage\n");
- return 0;
- }
-
- // Generate span context
- generate_random_bytes(storage->sc.TraceID, TRACE_ID_SIZE);
- generate_random_bytes(storage->sc.SpanID, SPAN_ID_SIZE);
- u32 map_id = 0;
- struct grpc_client_request_t *grpcClientReq = bpf_map_lookup_elem(&grpc_client_storage_map, &map_id);
- if (grpcClientReq == NULL)
- {
- bpf_map_delete_elem(&streamid_to_span_contexts, &stream_id);
- cw_bpf_debug("uprobe_LoopyWriter_HeaderHandler: grpcClientReq is NULL");
- return 0;
- }
- __builtin_memset(grpcClientReq, 0, sizeof(struct grpc_client_request_t));
- grpcClientReq->start_time = bpf_ktime_get_ns();
- // cw_bpf_debug("enter the uprobe_LoopyWriter_HeaderHandler444444\n");
- // struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
- struct apm_span_context *cw_psc = cw_get_parent_tracking_span_by_trace_key(*sc_ptr);
- if(cw_psc){
- bpf_probe_read(&grpcClientReq->apm_psc, sizeof(grpcClientReq->apm_psc), cw_psc);
- copy_byte_arrays(grpcClientReq->apm_psc.trace_id, grpcClientReq->apm_sc.trace_id, APM_TRACE_ID_SIZE);
- generate_random_bytes(grpcClientReq->apm_sc.span_id, APM_SPAN_ID_SIZE);
- copy_byte_arrays(grpcClientReq->apm_psc.assumed_app_id, grpcClientReq->apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
- }
- bpf_map_delete_elem(&streamid_to_span_contexts, &stream_id);
- u32 k0 = 0;
- struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
- if (trace_conf) {
- copy_byte_arrays(trace_conf->host_id, grpcClientReq->apm_sc.host_id, APM_HOST_ID_SIZE);
- }
- copy_byte_arrays(proc_info->instance_id, grpcClientReq->apm_sc.instance_id, APM_APP_ID_SIZE);
- copy_byte_arrays(proc_info->app_id, grpcClientReq->apm_sc.app_id, APM_APP_ID_SIZE);
- // cw_bpf_debug("grpcClientReq->apm_sc.app_id is %s\n", grpcClientReq->apm_sc.app_id);
- bpf_map_update_elem(&apm_current_span_context_map, sc_ptr, &grpcClientReq->apm_sc, BPF_ANY);
- // cw_save_current_tracking_span(&grpcClientReq->apm_sc);
- // Strategy: Write key and value separately using write_target_data
- // Both key and value can use write_target_data as it automatically manages memory allocation
- // First write the key
- char tp_key[CW_HEADER_KEY_LENGTH] = CW_HEADER_KEY_VAL;
- char *key_data_addr = cw_write_target_data((void *)tp_key, sizeof(tp_key), proc_info);
- if (key_data_addr == NULL) {
- cw_bpf_debug("Key data write failed\n");
- return 0;
- }
-
- // Prepare and write the value
- char val[CW_HEADER_VAL_LENGTH];
- __builtin_memset(val, 0, sizeof(val));
- span_context_to_cw_string(&grpcClientReq->apm_sc, val);
-
- // Write value using cw_write_target_data (it will automatically allocate next available position)
- char *val_data_addr = cw_write_target_data((void *)val, sizeof(val), proc_info);
- if (val_data_addr == NULL) {
- cw_bpf_debug("Val data write failed\n");
- return 0;
- }
-
- struct go_string_ot key_str = {.str = key_data_addr, .len = sizeof(tp_key)};
- struct go_string_ot val_str = {.str = val_data_addr, .len = sizeof(val)};
-
- // Build header field
- storage->hf.name = key_str;
- storage->hf.value = val_str;
- storage->hf.sensitive = false;
-
- // Append to slice
- void *slice_ptr = (void *)(headerFrame_ptr + headerFrame_hf_pos);
- cw_append_item_to_slice(&storage->hf, sizeof(storage->hf), slice_ptr);
- return 0;
- }
- SEC("uprobe/http2Client_NewStream")
- // func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
- int uprobe_http2Client_NewStream(struct pt_regs *ctx) {
- // cw_bpf_debug("enter the uprobe_http2Client_NewStream \n");
- __u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
- struct ebpf_proc_info *info =
- bpf_map_lookup_elem(&proc_info_map, &tgid);
- if (!info) {
- return -1;
- }
- // cw_bpf_debug("info->httpclient_nextid_pos is %d\n", info->httpclient_nextid_pos);
- struct go_iface go_context = {0};
- get_Go_context(ctx, 2, 0, true);
- void *httpclient_ptr = get_argument(ctx, 1);
- u32 nextid = 0;
- bpf_probe_read(&nextid, sizeof(nextid), (void *)(httpclient_ptr + (info->httpclient_nextid_pos)));
- // Get the span context from go context. The mapping is created in the Invoke probe,
- // the context here is derived from the Invoke context.
- // struct span_context *current_span_context = get_parent_span_context(&go_context);
- struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
- bpf_map_update_elem(&streamid_to_span_contexts, &nextid, &trace_key, 0);
- return 0;
- }
|