| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403 |
- #define PROTOCOL_TRACE 200
- #define PROTOCOL_UNKNOWN 0
- #define PROTOCOL_HTTP 1
- #define PROTOCOL_POSTGRES 2
- #define PROTOCOL_REDIS 3
- #define PROTOCOL_MEMCACHED 4
- #define PROTOCOL_MYSQL 5
- #define PROTOCOL_MONGO 6
- #define PROTOCOL_KAFKA 7
- #define PROTOCOL_CASSANDRA 8
- #define PROTOCOL_RABBITMQ 9
- #define PROTOCOL_NATS 10
- #define PROTOCOL_HTTP2 11
- #define PROTOCOL_DUBBO2 12
- #define PROTOCOL_DNS 13
- #define PROTOCOL_DM 14
- #define PROTOCOL_MARIADB 15
- #define PROTOCOL_GRPC 16
- #define STATUS_UNKNOWN 0
- #define STATUS_OK 200
- #define STATUS_FAILED 500
- #define METHOD_UNKNOWN 0
- #define METHOD_PRODUCE 1
- #define METHOD_CONSUME 2
- #define METHOD_STATEMENT_PREPARE 3
- #define METHOD_STATEMENT_CLOSE 4
- #define METHOD_HTTP2_CLIENT_FRAMES 5
- #define METHOD_HTTP2_SERVER_FRAMES 6
- #define ERROR_MSG_PAYLOAD_SIZE 128
- #define MAX_PAYLOAD_SIZE 1024 // must be power of 2
- #define TRUNCATE_PAYLOAD_SIZE(size) ({ \
- size = MIN(size, MAX_PAYLOAD_SIZE-1); \
- asm volatile ("%0 &= %1" : "+r"(size) : "i"(MAX_PAYLOAD_SIZE-1)); \
- })
- #define COPY_PAYLOAD(dst, size, src) ({ \
- TRUNCATE_PAYLOAD_SIZE(size); \
- if (bpf_probe_read(dst, size, src)) { \
- return 0; \
- } \
- })
- #define NS_PER_SEC 1000000000ULL
- #define IOVEC_BUF_SIZE MAX_PAYLOAD_SIZE * 2 // must be double of MAX_PAYLOAD_SIZE
- #define MAX_IOVEC_SIZE 32
- #include "http.c"
- #include "postgres.c"
- #include "redis.c"
- #include "memcached.c"
- #include "mysql.c"
- #include "mongo.c"
- #include "kafka.c"
- #include "cassandra.c"
- #include "rabbitmq.c"
- #include "nats.c"
- #include "http2.c"
- #include "dubbo2.c"
- #include "dns.c"
- #include "dm.c"
- // go type l7Event struct && type RequestData struct
- struct l7_event {
- __u64 fd;
- __u64 connection_timestamp;
- __u32 pid;
- __u32 status;
- __u64 duration;
- __u8 protocol;
- __u8 method;
- __u16 padding;
- __u32 statement_id;
- __u64 payload_size;
- __u64 trace_id;
- __u64 start_at;
- __u64 end_at;
- __u32 trace_start;
- __u32 trace_end;
- __u32 trace_type; // 0: normal, 1: grpc-server, 2: https 3 mq
- __u32 event_count;
- __u16 sport;
- __u16 dport;
- __u8 saddr[16];
- __u8 daddr[16];
- __u16 component_sport;
- __u16 component_dport;
- __u16 is_tls;
- __u8 component_saddr[16];
- __u8 component_daddr[16];
- unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
- unsigned char span_id[APM_SPAN_ID_SIZE];
- unsigned char trace_id_from[APM_TRACE_ID_SIZE];
- unsigned char called_id[APM_ASSUMED_APP_ID_SIZE];
- unsigned char instance_id_from[APM_INSTANCE_ID_SIZE];
- unsigned char app_id_from[APM_APP_ID_SIZE];
- unsigned char span_id_from[APM_SPAN_ID_SIZE];
- unsigned char type_from[APM_TYPE_FROM_SIZE];
- unsigned char target_addr[64];
- // 错误消息字段
- unsigned char error_message[ERROR_MSG_PAYLOAD_SIZE];
- // __u32 test_id;
- // MQ 相关信息(用于 Kafka、RabbitMQ 等消息队列)
- struct mq_info_t mq;
- char payload[MAX_PAYLOAD_SIZE];
- } ;
- struct test_t {
- __u32 test_id;
- };
- struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, int);
- __type(value, struct l7_event);
- __uint(max_entries, 1);
- } l7_event_heap SEC(".maps");
- struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, int);
- __type(value, struct test_t);
- __uint(max_entries, 1);
- } test_heap SEC(".maps");
- struct {
- __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
- __uint(key_size, sizeof(int));
- __uint(value_size, sizeof(int));
- } l7_events SEC(".maps");
- struct read_args {
- __u64 fd;
- char* buf;
- __u64* ret;
- __u64 iovlen;
- };
- struct {
- __uint(type, BPF_MAP_TYPE_LRU_HASH);
- __uint(key_size, sizeof(__u64));
- __uint(value_size, sizeof(struct read_args));
- __uint(max_entries, 10240);
- } active_reads SEC(".maps");
- struct l7_request_key {
- __u64 fd;
- __u32 pid;
- __u16 is_tls;
- __s16 stream_id;
- };
- struct l7_request {
- __u64 ns;
- __u8 protocol;
- __u8 partial;
- __u8 request_type;
- __s32 request_id;
- __u64 trace_id;
- unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
- unsigned char span_id[APM_SPAN_ID_SIZE];
- __u64 payload_size;
- char payload[MAX_PAYLOAD_SIZE];
- };
- struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, int);
- __type(value, struct l7_request);
- __uint(max_entries, 1);
- } l7_request_heap SEC(".maps");
- struct {
- __uint(type, BPF_MAP_TYPE_LRU_HASH);
- __uint(key_size, sizeof(struct l7_request_key));
- __uint(value_size, sizeof(struct l7_request));
- __uint(max_entries, 32768);
- } active_l7_requests SEC(".maps");
- struct l7_request_dm_ctx {
- // __u8 request_type;
- __u64 req_start_at_ns;
- // char db_name[128] ;
- // __s32 srv_encoding ;
- __u64 query_sql_payload_size;
- char query_sql_payload[MAX_PAYLOAD_SIZE];
- __u32 status;
- // __s32 svr_err_code;
- // __u64 svr_err_msg_size;
- // char svr_err_msg[MAX_PAYLOAD_SIZE];
- };
- struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, int);
- __type(value, struct l7_request_dm_ctx);
- __uint(max_entries, 1);
- } l7_request_dm_ctx_heap SEC(".maps");
- struct {
- __uint(type, BPF_MAP_TYPE_LRU_HASH);
- __uint(key_size, sizeof(struct l7_request_key));
- __uint(value_size, sizeof(struct l7_request_dm_ctx));
- __uint(max_entries, 32768);
- } active_l7_requests_dm_ctx SEC(".maps");
- struct {
- __uint(type, BPF_MAP_TYPE_LRU_HASH);
- __uint(key_size, sizeof(struct l7_request_key));
- __uint(value_size, MYSQL_PACKAGE_HEADER_LEN);
- __uint(max_entries, 32768);
- } active_l7_requests_mysql_resp_header_ctx SEC(".maps");
- struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, int);
- __type(value, char[IOVEC_BUF_SIZE]);
- __uint(max_entries, 1);
- } iovec_buf_heap SEC(".maps");
- struct trace_event_raw_sys_enter_rw__stub {
- __u64 unused;
- long int id;
- __u64 fd;
- char* buf;
- __u64 size;
- };
- struct trace_event_raw_sys_exit_rw__stub {
- __u64 unused;
- long int id;
- long int ret;
- };
- struct l7_iovec {
- char* buf;
- __u64 size;
- };
- struct l7_user_msghdr {
- void *msg_name;
- int msg_namelen;
- struct l7_iovec *msg_iov;
- __u64 msg_iovlen;
- void *msg_control;
- __u64 msg_controllen;
- __u32 msg_flags;
- };
- struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
- __type(key, int);
- __type(value, struct apm_span_context);
- __uint(max_entries, 1);
- } apm_span_context_heap3 SEC(".maps");
- #define TRACE_ID_SIZE 16
- static __inline __u32 has_cw_header(const char *data);
- static __always_inline void cw_string_to_span_context(char *str, struct apm_span_context *ctx);
- static __always_inline void generate_random_bytes(unsigned char *buff, __u32 size);
- static __inline __attribute__((__always_inline__)) void cw_save_parent_tracking_span(struct apm_span_context *sc);
- static inline __attribute__((__always_inline__))
- void send_event(void *ctx, struct l7_event *e, struct connection_id cid, struct connection *conn) {
- e->connection_timestamp = conn->timestamp;
- e->fd = cid.fd;
- e->pid = cid.pid;
- long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
- if (error ==0){
- cw_add_event_count(e->trace_id);
- }
- }
- static inline __attribute__((__always_inline__))
- __u64 read_iovec(char *iovec, __u64 iovlen, __u64 ret, char *buf, __u64 *total_size) {
- struct l7_iovec iov = {};
- __u64 max = (ret) ? MIN(ret, MAX_PAYLOAD_SIZE) : MAX_PAYLOAD_SIZE;
- __u64 offset = 0;
- __u64 size = 0;
- #pragma unroll
- for (int i = 0; i < MAX_IOVEC_SIZE; i++) {
- if (i >= iovlen) {
- break;
- }
- if (bpf_probe_read(&iov, sizeof(iov), (void *)(iovec+i*sizeof(iov)))) {
- return 0;
- }
- if (iov.size <= 0) {
- continue;
- }
- *total_size += iov.size;
- if (offset < max) {
- size = MIN(iov.size, max-offset);
- TRUNCATE_PAYLOAD_SIZE(size);
- TRUNCATE_PAYLOAD_SIZE(offset);
- if (bpf_probe_read(buf + offset, size, (void *)iov.buf)) {
- return 0;
- }
- offset += size;
- }
- }
- return offset;
- }
- static inline __attribute__((__always_inline__))
- int trace_dns_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, __u64 iovlen) {
- __u64 id = bpf_get_current_pid_tgid();
- __u32 zero = 0;
- __u32 pid, tid;
- __u32 http_status ;
- pid = id >> 32;
- tid = (__u32)id;
- if (load_filter_pid() != 0 && pid != load_filter_pid()) {
- return 0;
- }
- char* payload = buf;
- __u64 total_size = 0;
- if (iovlen) {
- payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
- if (!payload) {
- return 0;
- }
- size = read_iovec(buf, iovlen, 0, payload, &total_size);
- }
- if (!size) {
- return 0;
- }
- struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
- if (!req) {
- return 0;
- }
- req->protocol = PROTOCOL_UNKNOWN;
- req->partial = 0;
- req->request_id = 0;
- req->ns = 0;
- req->payload_size = size;
- struct l7_request_key k = {};
- k.pid = id >> 32;
- k.fd = fd;
- k.is_tls = is_tls;
- k.stream_id = -1;
- if (is_dns_request(payload, size, &k.stream_id)) {
- req->protocol = PROTOCOL_DNS;
- }
- if (req->protocol == PROTOCOL_UNKNOWN) {
- return 0;
- }
- if (req->ns == 0) {
- req->ns = bpf_ktime_get_ns();
- }
- COPY_PAYLOAD(req->payload, size, payload);
- bpf_map_update_elem(&active_l7_requests, &k, req, BPF_NOEXIST);
- return 0;
- }
- static inline __attribute__((__always_inline__))
- int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, __u64 iovlen) {
- __u64 id = bpf_get_current_pid_tgid();
- __u32 zero = 0;
- __u32 pid, tid;
- __u32 http_status ;
- pid = id >> 32;
- tid = (__u32)id;
- __u64 total_size = size;
- if (load_filter_pid() != 0 && pid != load_filter_pid()) {
- return 0;
- }
- // bpf_printk("trace_enter_write fd(%d) ,size(%d)", fd, size);
- char* payload = buf;
- if (iovlen) {
- payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
- if (!payload) {
- return 0;
- }
- total_size = 0;
- size = read_iovec(buf, iovlen, 0, payload, &total_size);
- }
- if (!size) {
- return 0;
- }
- struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
- if (!req) {
- return 0;
- }
- req->protocol = PROTOCOL_UNKNOWN;
- req->partial = 0;
- req->request_id = 0;
- req->ns = 0;
- req->payload_size = size;
- struct l7_request_key k = {};
- k.pid = pid;
- k.fd = fd;
- k.is_tls = is_tls;
- k.stream_id = -1;
- struct connection_id cid = {};
- cid.pid = pid;
- cid.fd = fd;
- // cw_bpf_debug("enter-payload:%s|type:%s|FD:%d\n",payload,"type",k.fd);
- if (is_http_response(payload, &http_status))
- {
- //处理http请求之前,确认进程信息是否存在
- struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &pid);
- if (!proc_info) {
- cw_bpf_debug("[Trace End in l7][Response][HTTP]:no proc info. pid:%d \n",k.pid);
- return 0;
- }
- struct apm_trace_info_t * start_trace_info = get_trace_info_by_fd(pid, fd);
- if (!start_trace_info) {
- return -1;
- }
- __u64 trace_id = start_trace_info->trace_id;
- __u32 event_count = cw_get_event_count(trace_id);
- // bpf_printk("[Trace End in l7] count(%d) %llu ", event_count, trace_id);
- cw_bpf_debug("[uprobeThread/pidpidpidpid][Trace End in l7][HTTP]pid:[%d]--[%lld]", tid, bpf_ktime_get_ns());
- cw_bpf_debug("[Trace End in l7][Response][HTTP] event_count:%d", event_count);
- cw_bpf_debug("[Trace End in l7][Response][HTTP] pid:%d,fd:%d,trace_id:%llu", tid, fd, trace_id);
- // 发送事件到用户空间 start
- struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
- if (!e) {
- cw_clear_trace(pid, tid, fd);
- return -1;
- }
- // parent sc
- struct apm_span_context *cw_psc = cw_get_parent_tracking_span_by_trace_key(start_trace_info->trace_key);
- if(cw_psc){
- // bpf_printk("[pid:%d][goid:%d] find header",pid,start_trace_info->trace_key.goid);
- cw_copy_byte_arrays(cw_psc->trace_id, e->trace_id_from, APM_TRACE_ID_SIZE);
- cw_copy_byte_arrays(cw_psc->assumed_app_id, e->called_id, APM_ASSUMED_APP_ID_SIZE);
- cw_copy_byte_arrays(cw_psc->instance_id, e->instance_id_from, APM_INSTANCE_ID_SIZE);
- cw_copy_byte_arrays(cw_psc->app_id, e->app_id_from, APM_APP_ID_SIZE);
- cw_copy_byte_arrays(cw_psc->span_id, e->span_id_from, APM_SPAN_ID_SIZE);
- cw_copy_byte_arrays(cw_psc->type_from, e->type_from, APM_TYPE_FROM_SIZE);
- // for (int i = 0; i < APM_TYPE_FROM_SIZE; i++) {
- // bpf_printk("trace_enter_write - type_from = %02x", e->type_from[i]);
- // }
- }
- struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
- if (!req)
- {
- cw_clear_trace(pid, tid, fd);
- return 0;
- }
- SET_TRACE_END(e, PROTOCOL_HTTP);
- e->start_at = req->ns;
- cw_bpf_debug("req->ns:%llu",req->ns);
- e->end_at = bpf_ktime_get_ns();
- e->duration = e->end_at - e->start_at;
- // cw_bpf_debug("[Response][HTTP]:duration->ns:%d\n",e->duration);
- e->status = http_status;
- e->pid = k.pid;
- e->fd = k.fd;
- // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
- e->trace_type = 0;
- e->trace_id = trace_id;
- e->payload_size = size;
- e->event_count = event_count;
- COPY_PAYLOAD(e->payload, size, payload);
- e->is_tls = is_tls;
- bpf_map_delete_elem(&active_l7_requests, &k);
- // 清除事件计数
- bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
- // 清除业务层trace信息
- clear_parent_span_context_by_trace_key(start_trace_info->trace_key);
- // 清除trace信息
- cw_clear_trace(pid, tid, fd);
- cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- struct accept_connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
- if (accept_conn) {
- cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- cw_bpf_debug("rock enter the accept_conn function cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- e->sport = accept_conn->sport;
- e->dport = accept_conn->dport;
- __builtin_memcpy(&e->saddr, &accept_conn->saddr, sizeof(e->saddr));
- __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
- // __sync_fetch_and_add(&accept_conn->bytes_sent, total_size);
- cw_bpf_debug("socket sys_exit_accept--- accept_conn daddr=%llu, daddr=%llu\n", e->saddr[10], e->saddr[11]);
- cw_bpf_debug("socket sys_exit_accept--- accept_conn daddr=%llu, daddr=%llu\n", e->saddr[12], e->saddr[13]);
- cw_bpf_debug("socket sys_exit_accept--- accept_conn daddr=%llu, daddr=%llu\n", e->saddr[14], e->saddr[15]);
- }
- bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
- // 发送事件到用户空间 end
- // __u64 k_version = load_filter_pid();
- // cw_bpf_debug("filter_pid:%d", k_version);
- //
- // struct test_t *ttt = bpf_map_lookup_elem(&test_heap, &zero);
- // if (!ttt) {
- // return 0;
- // }
- // struct member_fields_offset *off = bpf_map_lookup_elem(&__members_offset, &zero);
- // if (!off) {
- // return 0;
- // }
- // cw_bpf_debug("off->task__files_offset:%x", off->task__files_offset);
- // cw_bpf_debug("e->test_id1111:%d", ttt->test_id);
- cw_bpf_debug("HTTP_END");
- // //TODO 4 查询
- // cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- // struct accept_connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
- // if (accept_conn && !is_tls) {
- // cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- // e.sport = accept_conn.sport;
- // e.dport = accept_conn.dport;
- // __builtin_memcpy(&e.saddr, &accept_conn.saddr, sizeof(e.saddr));
- // __builtin_memcpy(&e.daddr, &accept_conn.daddr, sizeof(e.daddr));
- // // __sync_fetch_and_add(&accept_conn->bytes_sent, total_size);
- // }
- return 0;
- }
- struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
- if (!conn) {
- //TODO 4 查询
- // cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- // struct connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
- // if (accept_conn && !is_tls) {
- // cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- // __sync_fetch_and_add(&accept_conn->bytes_sent, total_size);
- // }
- return 0;
- }
- if (!is_tls) {
- __sync_fetch_and_add(&conn->bytes_sent, total_size);
- if(conn->first_write_time == 0){
- conn->first_write_time = bpf_ktime_get_ns();
- }
- }
- if (is_http_request(payload)) {
- cw_bpf_debug("");
- cw_bpf_debug("-----[Kernel HTTP Enter]:pid:[%d]|CURRENT-GOID:[%llu]|FD:[%d]", tid, get_current_goroutine(), k.fd);
- __u8 type = 0;
- __u64 trace_id = 0;
- struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
- struct apm_trace_info_t * trace_info = get_apm_trace_info_by_trace_key(trace_key);
- if (trace_info == NULL) {
- trace_info = get_apm_trace_info_v3(trace_key,id, pid, tid);
- }
- if (trace_info) {
- // cw_bpf_debug("%llu",trace_info->trace_id);
- trace_id = trace_info->trace_id;
- type = trace_info->type;
- }
- req->protocol = PROTOCOL_HTTP;
- req->trace_id = trace_id;
- // cw_bpf_debug("l7.c111 addr is --------:%d,%s",conn->sport,conn->saddr);
- struct apm_span_context * sc = cw_get_current_tracking_span(trace_info);
- if (sc) {
- cw_copy_byte_arrays(sc->assumed_app_id, req->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
- cw_copy_byte_arrays(sc->span_id, req->span_id, APM_SPAN_ID_SIZE);
- // for (int i = 0; i < APM_ASSUMED_APP_ID_SIZE; i++) {
- // cw_bpf_debug("assumed_app_id-assumed_app_id[%d] = %02x", i, req->assumed_app_id[i]);
- // }
- // for (int i = 0; i < APM_SPAN_ID_SIZE; i++) {
- // cw_bpf_debug("cw_get_current_tracking_span-span_id[%d] = %02x", i, req->span_id[i]);
- // }
- }
- // 0默认,1
- cw_bpf_debug("[Kernel HTTP Enter]req-payload:%s [traceid:%llu][type:%d]",payload,trace_id,type);
- } else if (is_postgres_query(payload, size, &req->request_type)) {
- // if (req->request_type == POSTGRES_FRAME_CLOSE) {
- // struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
- // if (!e) {
- // return 0;
- // }
- // __u64 trace_id = get_apm_trace_id(pid, tid);
- // e->trace_id = trace_id;
- // e->protocol = PROTOCOL_POSTGRES;
- // e->method = METHOD_STATEMENT_CLOSE;
- // e->payload_size = size;
- // COPY_PAYLOAD(e->payload, size, payload);
- // send_event(ctx, e, cid, conn);
- // return 0;
- // }
- req->protocol = PROTOCOL_POSTGRES;
- } else if (is_redis_query(payload, size)) {
- cw_bpf_debug("[Enter][Redis]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
- req->protocol = PROTOCOL_REDIS;
- } else if (is_memcached_query(payload, size)) {
- cw_bpf_debug("[Enter][MEMCACHE]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
- req->protocol = PROTOCOL_MEMCACHED;
- } else if (is_mysql_query(payload, size, &req->request_type)) {
- cw_bpf_debug("[Enter][Mysql]:thread_id:%d\n",tid);
- if (req->request_type == MYSQL_COM_STMT_CLOSE) {
- return 0;
- struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
- if (!e) {
- return 0;
- }
- __u64 trace_id = get_apm_trace_id(pid, tid);
- e->trace_id = trace_id;
- e->protocol = PROTOCOL_MYSQL;
- e->method = METHOD_STATEMENT_CLOSE;
- e->payload_size = size;
- COPY_PAYLOAD(e->payload, size, payload);
- cw_bpf_debug("[Enter][Mysql][Send]:thread_id:%d\n",tid);
- send_event(ctx, e, cid, conn);
- return 0;
- }
- req->protocol = PROTOCOL_MYSQL;
- } else if (is_dm_query(payload, size,&req->request_type)) {
- // cw_bpf_debug("[Request][DM] start -------->");
- req->protocol = PROTOCOL_DM;
- struct l7_request_dm_ctx *dm_ctx ;
- dm_ctx = bpf_map_lookup_elem(&active_l7_requests_dm_ctx, &k);
- if (!dm_ctx) {
- dm_ctx = bpf_map_lookup_elem(&l7_request_dm_ctx_heap, &zero);
- if (!dm_ctx) {
- return 0;
- }
- dm_ctx->req_start_at_ns = 0 ;
- dm_ctx->status = 0;
- bpf_map_update_elem(&active_l7_requests_dm_ctx, &k, dm_ctx, BPF_NOEXIST);
- // cw_bpf_debug("[Request][DM] init active_l7_requests_dm_ctx,request_type <0x%x> [%d]",req->request_type,req->request_type);
- }
- if (req->request_type == DM_QUERY) {
- dm_ctx->req_start_at_ns=bpf_ktime_get_ns();
- req->ns = dm_ctx->req_start_at_ns ;
- dm_ctx->query_sql_payload_size = size;
- COPY_PAYLOAD(dm_ctx->query_sql_payload, size, payload);
- bpf_map_update_elem(&active_l7_requests_dm_ctx, &k, dm_ctx, BPF_ANY);
- }
- if(req->request_type == DM_STMT_EXECUTE) {
- bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
- return 0 ;
- }
- // cw_bpf_debug("[Request][DM] is request ,request_type <0x%x> [%d]",req->request_type,req->request_type);
- // cw_bpf_debug("[Request][DM] end <--------");
- } else if (is_mongo_query(payload, size)) {
- req->protocol = PROTOCOL_MONGO;
- } else if (is_rabbitmq_produce(payload, size)) {
- struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
- if (!e) {
- return 0;
- }
- e->protocol = PROTOCOL_RABBITMQ;
- e->method = METHOD_PRODUCE;
- send_event(ctx, e, cid, conn);
- return 0;
- } else if (nats_method(payload, size) == METHOD_PRODUCE) {
- struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
- if (!e) {
- return 0;
- }
- e->protocol = PROTOCOL_NATS;
- e->method = METHOD_PRODUCE;
- send_event(ctx, e, cid, conn);
- return 0;
- } else if (is_cassandra_request(payload, size, &k.stream_id)) {
- // bpf_printk("[cassandra] [start] fd(%d) stream_id(%d) goid(%d)", k.fd, k.stream_id, get_current_goroutine());
- __u32 ctx_id =get_current_goroutine();
- struct thread_ctx_t *current_ctx = bpf_map_lookup_elem(&thread_ctx_map, &ctx_id);
- if (current_ctx) {
- req->trace_id = current_ctx->token;
- bpf_map_delete_elem(&thread_ctx_map, &ctx_id);
- }
- req->protocol = PROTOCOL_CASSANDRA;
- } else if (is_kafka_request(payload, size, &req->request_id)) {
- req->protocol = PROTOCOL_KAFKA;
- struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
- if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
- req->ns = prev_req->ns;
- }
- // } else if (looks_like_http2_frame(payload, size, METHOD_HTTP2_CLIENT_FRAMES)) {
- // struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
- // if (!e) {
- // return 0;
- // }
- // e->protocol = PROTOCOL_HTTP2;
- // e->method = METHOD_HTTP2_CLIENT_FRAMES;
- // e->duration = bpf_ktime_get_ns();
- // e->payload_size = size;
- // e->trace_id = get_apm_trace_id(pid, tid);
- // COPY_PAYLOAD(e->payload, size, payload);
- // send_event(ctx, e, cid, conn);
- // return 0;
- } else if (is_dubbo2_request(payload, size)) {
- req->protocol = PROTOCOL_DUBBO2;
- } else if (is_dns_request(payload, size, &k.stream_id)) {
- req->protocol = PROTOCOL_DNS;
- }
- if (req->protocol == PROTOCOL_UNKNOWN) {
- return 0;
- }
- if (req->ns == 0) {
- req->ns = bpf_ktime_get_ns();
- }
- COPY_PAYLOAD(req->payload, size, payload);
- bpf_map_update_elem(&active_l7_requests, &k, req, BPF_NOEXIST);
- return 0;
- }
- static inline __attribute__((__always_inline__))
- int trace_enter_read(__u64 id, __u32 pid, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
- if (load_filter_pid() != 0 && pid != load_filter_pid()) {
- return 0;
- }
- struct read_args args = {};
- args.fd = fd;
- args.buf = buf;
- args.ret = ret;
- args.iovlen = iovlen;
- bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY);
- return 0;
- }
- // 通用的trace_exit_read逻辑,通过参数控制是否执行bpf_tail_call
- static inline __attribute__((__always_inline__))
- int trace_exit_read_common(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret, int tp_tail_call) {
- __u32 tid = (__u32)id;
- if (load_filter_pid() != 0 && pid != load_filter_pid()) {
- return 0;
- }
- struct read_args *args = bpf_map_lookup_elem(&active_reads, &id);
- if (!args) {
- return 0;
- }
- struct l7_request_key k = {};
- k.pid = pid;
- k.fd = args->fd;
- k.is_tls = is_tls;
- k.stream_id = -1;
- bpf_map_delete_elem(&active_reads, &id);
- if (ret <= 0) {
- return 0;
- }
- if (args->ret) {
- if (bpf_probe_read(&ret, sizeof(ret), (void*)args->ret)) {
- return 0;
- };
- if (ret <= 0) {
- return 0;
- }
- }
- __u64 total_size = ret;
- int zero = 0;
- char* payload = args->buf;
- if (args->iovlen) {
- payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
- if (!payload) {
- return 0;
- }
- total_size = 0;
- ret = read_iovec(args->buf, args->iovlen, ret, payload, &total_size);
- if (!ret) {
- return 0;
- }
- }
- struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
- if (!e) {
- return 0;
- }
- e->fd = k.fd;
- e->pid = k.pid;
- e->protocol = PROTOCOL_UNKNOWN;
- e->status = STATUS_UNKNOWN;
- e->method = METHOD_UNKNOWN;
- e->statement_id = 0;
- e->payload_size = 0;
- e->trace_id = 0;
- e->is_tls = is_tls;
- __u8 b[8];
- // bpf_read(payload, b);
- struct connection_id cid = {};
- cid.pid = pid;
- cid.fd = args->fd;
- // 被调用方http入口
- // 作为服务端在走。coroot 原有逻辑是没有的
- if (is_http_request(payload)) {
- //处理http请求之前,确认进程信息是否存在
- struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &pid);
- if (!proc_info) {
- cw_bpf_debug("[Receive][HTTP]:no proc info. pid:%d",k.pid);
- return 0;
- }
- struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
- if (!req)
- {
- return 0;
- } else
- {
- req->protocol = PROTOCOL_HTTP;
- req->payload_size = ret;
- req->ns = bpf_ktime_get_ns();
- COPY_PAYLOAD(req->payload, ret, payload);
- // cw_bpf_debug("[Receive][HTTP]:pid:%d|tid:%d",k.pid,k.fd);
- // cw_bpf_debug("[Receive][HTTP]:is_tls:%d|tid:%d",k.is_tls,k.stream_id);
- bpf_map_update_elem(&active_l7_requests, &k, req, BPF_NOEXIST);
- }
- // cw_bpf_debug("[Receive][HTTP] payload1:%s|type:%s\n",payload,"type");
- // __u64 goid = 0;
- // goid = get_rw_goid(120 * NS_PER_SEC, 1);
- // cw_bpf_debug("[Receive][HTTP] goid:%llu\n",goid);
- // __u64 goid = get_rw_goid(120 * NS_PER_SEC, 1);
- // cw_bpf_debug("[Receive][HTTP]:thread_id:%d|goid:%d|FD:%d\n", tid, goid, k.fd);
- // // apm trace
- // struct apm_trace_key_t trace_key = {0};
- // trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
- //
- // // fd trace
- // struct fd_trace_key_t fd_trace_key = {0};
- // fd_trace_key = get_fd_trace_key(pid, k.fd);
- //
- // // trace info
- struct apm_trace_info_t trace_info = cw_save_trace_info(id,pid, k.fd);
- // __u64 uid_base = bpf_ktime_get_ns();
- // trace_info.trace_id = bpf_get_current_pid_tgid() + uid_base;
- SET_TRACE_START(e, PROTOCOL_HTTP);
- e->trace_type = 0;
- e->trace_id = trace_info.trace_id;
- cw_bpf_debug("\n");
- cw_bpf_debug("[Trace Start in l7][HTTP]pid:[%d]--[%lld]--trace_id:%llu\n", pid, bpf_ktime_get_ns(),trace_info.trace_id);
- cw_bpf_debug("[Trace Start in l7][Receive][HTTP]tid:[%d]|GOID:[%d]|FD:%d\n", tid, trace_info.trace_key.goid,k.fd);
- e->payload_size = ret;
- COPY_PAYLOAD(e->payload, ret, payload);
- // pid_tgid:trace_id
- // thread_trace_key =
- // 入口方法缓存
- // bpf_map_update_elem(&trace_info_heap, &trace_key, &trace_info, BPF_NOEXIST);
- // bpf_map_update_elem(&fd_trace_info_heap, &fd_trace_key, &trace_info, BPF_NOEXIST);
- bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
- cw_bpf_debug("[Receive][HTTP] to user space");
- // 作为服务端统计 bytes_received 使用
- // struct connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
- // cw_bpf_debug("socket accept bytes_received cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- // if (accept_conn && !is_tls){
- // cw_bpf_debug("socket accept bytes_received after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- // __sync_fetch_and_add(&accept_conn->bytes_received, total_size);
- // }
- //bpf_tail_call PROGUP(l7_http_request)
- cw_bpf_debug("======== PROG_DATA_L7_HTTP_TRACE_ID_UP_IDX ========== __KERNEL_FROM < 512 pid:[%d] ",tid);
- if (tp_tail_call && proc_info->code_type != CodeTypeGo) {
- bpf_tail_call(ctx, &NAME(progs_jmp_tp_map), PROG_DATA_L7_HTTP_TRACE_ID_TP_IDX);
- }
- return 0;
- }
- struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
- if (args && !conn) {
- bpf_map_delete_elem(&active_reads, &id);
- // struct connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
- // cw_bpf_debug("socket accept bytes_received cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- // if (accept_conn && !is_tls){
- // cw_bpf_debug("socket accept bytes_received after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
- // __sync_fetch_and_add(&accept_conn->bytes_received, total_size);
- // }
- return 0;
- }
- //TODO 5 同发送逻辑。
- /// coroot 是以客户端为主体做统计的,所以这里是客户端逻辑
- if (!is_tls) {
- __sync_fetch_and_add(&conn->bytes_received, total_size);
- if(conn->first_read_time == 0){
- conn->first_read_time = bpf_ktime_get_ns();
- }
- conn->new_read_time = bpf_ktime_get_ns();
- }
- if (is_rabbitmq_consume(payload, ret)) {
- e->protocol = PROTOCOL_RABBITMQ;
- e->method = METHOD_CONSUME;
- send_event(ctx, e, cid, conn);
- return 0;
- }
- if (nats_method(payload, ret) == METHOD_CONSUME) {
- e->protocol = PROTOCOL_NATS;
- e->method = METHOD_CONSUME;
- send_event(ctx, e, cid, conn);
- return 0;
- }
- struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
- int response = 0;
- if (!req) {
- // cw_bpf_debug("no req? 6:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
- if (is_dns_response(payload, ret, &k.stream_id, &e->status)) {
- // cw_bpf_debug("dns");
- req = bpf_map_lookup_elem(&active_l7_requests, &k);
- if (!req) {
- return 0;
- }
- e->protocol = PROTOCOL_DNS;
- e->start_at = req->ns;
- e->end_at = bpf_ktime_get_ns();
- e->duration = e->end_at - req->ns;
- e->payload_size = ret;
- e->trace_id = get_apm_trace_id(pid, tid);
- COPY_PAYLOAD(e->payload, ret, payload);
- send_event(ctx, e, cid, conn);
- bpf_map_delete_elem(&active_l7_requests, &k);
- return 0;
- } else if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
- // bpf_printk("[cassandra end] fd(%d) k.stream_id(%d) goid(%d)", k.fd,k.stream_id,get_current_goroutine());
- req = bpf_map_lookup_elem(&active_l7_requests, &k);
- if (!req) {
- return 0;
- }
- e->trace_id = req->trace_id;
- response = 1;
- } else {
- // cw_bpf_debug("bb 6:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
- return 0;
- }
- }
- e->protocol = req->protocol;
- e->payload_size = req->payload_size;
- COPY_PAYLOAD(e->payload, req->payload_size, req->payload);
- bpf_map_delete_elem(&active_l7_requests, &k);
- // cw_bpf_debug("delete req--------:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
- if (e->protocol == PROTOCOL_HTTP) {
- // __u64 trace_id = req->trace_id;
- e->trace_id = req->trace_id;
- cw_bpf_debug("l7.c addr is --------:%d,%s",conn->sport,conn->saddr);
- e->component_sport = conn->sport;
- e->component_dport = conn->dport;
- __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
- __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
- // struct apm_span_context * sc = cw_get_current_tracking_span();
- // if (sc) {
- cw_copy_byte_arrays(req->assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
- cw_copy_byte_arrays(req->span_id, e->span_id, APM_SPAN_ID_SIZE);
- // for (int i = 0; i < APM_ASSUMED_APP_ID_SIZE; i++) {
- // cw_bpf_debug("assumed_app_id-assumed_app_id[%d] = %02x", i, req->assumed_app_id[i]);
- // }
- // for (int i = 0; i < APM_SPAN_ID_SIZE; i++) {
- // cw_bpf_debug("cw_get_current_tracking_span-span_id[%d] = %02x", i, req->span_id[i]);
- // }
- // }
- // cw_bpf_debug("[Response][HTTP222]:thread_id:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
- // cw_bpf_debug("[Response][HTTP222] trace_id:%llu", trace_id);
- // // 请求报文
- // cw_bpf_debug("[Response][HTTP222] req-payload:%s",e->payload);
- // // 响应报文
- // cw_bpf_debug("[Response][HTTP222] resp-payload:%s",payload);
- response = is_http_response(payload, &e->status);
- // cw_bpf_debug("[Kernel End][HTTP]:pid:[%d]|CURRENT-GOID:[%llu]|trace_id:[%llu]---------\n", tid, get_current_goroutine(),e->trace_id);
- } else if (e->protocol == PROTOCOL_POSTGRES) {
- // __u64 trace_id = get_apm_trace_id(pid, tid);
- // cw_bpf_debug("[postgres sql] trace_id:%llu", trace_id);
- // e->trace_id = req->trace_id;
- e->component_sport = conn->sport;
- e->component_dport = conn->dport;
- __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
- __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
- response = is_postgres_response(payload, ret, &e->status);
- if (req->request_type == POSTGRES_FRAME_PARSE) {
- e->method = METHOD_STATEMENT_PREPARE;
- }
- } else if (e->protocol == PROTOCOL_REDIS) {
- cw_bpf_debug("[Response][Redis]:TGID:%d|type:%s|FD:%d\n", k.pid, "", k.fd);
- // __u64 trace_id = get_apm_trace_id(pid, tid);
- // e->trace_id = req->trace_id;
- cw_bpf_debug("[Redis] trace_id:%llu", req->trace_id);
- e->component_sport = conn->sport;
- e->component_dport = conn->dport;
- __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
- __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
- response = is_redis_response(payload, ret, &e->status, e->error_message);
- } else if (e->protocol == PROTOCOL_MEMCACHED) {
- cw_bpf_debug("[Response][MEMCACHE]:thread_id:%d\n", tid);
- e->component_sport = conn->sport;
- e->component_dport = conn->dport;
- __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
- __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
- response = is_memcached_response(payload, ret, &e->status, e->error_message);
- } else if (e->protocol == PROTOCOL_MYSQL) {
- cw_bpf_debug("[Response][Mysql]:thread_id:%d\n", tid);
- // __u64 trace_id = get_apm_trace_id(pid, tid);
- // cw_bpf_debug("[Mysql] trace_id:%llu", trace_id);
- // e->trace_id = trace_id;
- //response package parsing partial data (such as by header)
- if(ret == MYSQL_PACKAGE_HEADER_LEN) {
- //sava header to ctx and return
- char resp_packet_header[MYSQL_PACKAGE_HEADER_LEN];
- bpf_probe_read(resp_packet_header,MYSQL_PACKAGE_HEADER_LEN, payload);
- bpf_map_update_elem(&active_l7_requests_mysql_resp_header_ctx, &k, resp_packet_header, BPF_ANY);
- bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
- return 0 ;
- }
- e->component_sport = conn->sport;
- e->component_dport = conn->dport;
- __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
- __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
- char* resp_packet_header = bpf_map_lookup_elem(&active_l7_requests_mysql_resp_header_ctx, &k);
- if(resp_packet_header) {
- char resp_combined_packet[5];
- bpf_probe_read(resp_combined_packet,MYSQL_PACKAGE_HEADER_LEN, resp_packet_header);
- bpf_probe_read(&resp_combined_packet[4],1, payload);
- response = is_mysql_response(resp_combined_packet,sizeof(resp_combined_packet), req->request_type, &e->statement_id, &e->status, e->error_message);
- if(response) {
- bpf_map_delete_elem(&active_l7_requests_mysql_resp_header_ctx, &k);
- }
- } else {
- response = is_mysql_response(payload, ret, req->request_type, &e->statement_id, &e->status, e->error_message);
- }
- if (req->request_type == MYSQL_COM_STMT_PREPARE) {
- e->method = METHOD_STATEMENT_PREPARE;
- }
- } else if (e->protocol == PROTOCOL_DM) {
- // cw_bpf_debug("[Response][DM] start -------->");
- struct l7_request_dm_ctx *dm_ctx = bpf_map_lookup_elem(&active_l7_requests_dm_ctx, &k);
- if (!dm_ctx) {
- return 0;
- }
- //获取server端的字符集类型(用来编码数据库名称,暂时不用)
- // if(req->request_type == DM_VERSION_CLI) {
- // __u8 srv_encode_buf[4];
- // bpf_read(payload+DM_SVR_ENCODING_OFFSET, srv_encode_buf);
- //
- // cw_bpf_debug(" srv_encode_buf:[0x%x]",srv_encode_buf[0]);
- // cw_bpf_debug(" srv_encode_buf:[0x%x]",srv_encode_buf[1]);
- // cw_bpf_debug(" srv_encode_buf:[0x%x]",srv_encode_buf[2]);
- // cw_bpf_debug(" srv_encode_buf:[0x%x]",srv_encode_buf[3]);
- // dm_ctx->srv_encoding = (__s32)(srv_encode_buf[0]& 0xff) | ((__s32)srv_encode_buf[1]&0xff) << 8 | ((__s32)srv_encode_buf[2]&0xff) << 16 | ((__s32)srv_encode_buf[3]&0xff) << 24 ;
- // cw_bpf_debug(" srv_encode_buf:[%d]",dm_ctx->srv_encoding);
- // bpf_map_update_elem(&active_l7_requests_dm_ctx, &k, dm_ctx, BPF_ANY);
- // return 0;
- // }
- //获取数据库名称(暂时不用)
- // if(req->request_type == DM_DO_LOGIN ) {
- //// //todo 拷贝payload 不进行处理,留给用户空间处理
- //// //pack_type == DM_LOGIN_ACK
- // if (ret == DM_HEADER_SIZE) {
- // bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
- // return 0;
- // }
- //// //获取数据库名称
- // // bpf_read(buf+DM_PAYLOAD_OFFSET, svr_encode_buf);
- // __u8 tmp_1[4];
- // // __u32 tmp_offset = DM_PAYLOAD_OFFSET ;
- // __u32 tmp_offset = 0 ;
- // //1
- // bpf_read(payload+tmp_offset, tmp_1);
- // __s32 tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----1 tmp_len:%d",tmp_len);
- //
- // //2
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----2 tmp_len:%d",tmp_len);
- // //3
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----3 tmp_len:%d",tmp_len);
- // //4
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----4 tmp_len:%d",tmp_len);
- // //5
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----5 tmp_len:%d",tmp_len);
- // //6
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----6 tmp_len:%d",tmp_len);
- // //7
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----7 tmp_len:%d",tmp_len);
- // //8
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----8 tmp_len:%d",tmp_len);
- //
- // //9
- // tmp_offset = tmp_offset + 4 ;
- // //10
- // tmp_offset = tmp_offset + 4 ;
- // //11
- // tmp_offset = tmp_offset + 4 ;
- // //12
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // tmp_offset = tmp_offset + 4 ;
- // tmp_offset = tmp_offset+tmp_len ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----12 tmp_len:%d",tmp_len);
- // //13
- // bpf_read(payload+tmp_offset,tmp_1);
- // tmp_offset = tmp_offset + 4 ;
- // tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
- // cw_bpf_debug("===== DM_LOGIN_ACK ----13 tmp_len:%d",tmp_len);
- //
- // cw_bpf_debug("===== DM_LOGIN_ACK dbName tmp_offset:%d",tmp_offset);
- // if(tmp_len>0){
- //// char dbNameBuff[7] ;
- // char dbNameBuff[128] ;
- // if(tmp_len > 128){
- // tmp_len = 128;
- // }
- // bpf_read(payload+tmp_offset,dbNameBuff);
- // dbNameBuff[tmp_len-1]='\0';
- //// cw_bpf_debug("===== DM-resp DM_LOGIN_ACK dbName[0]:%c",dbNameBuff[0]);
- //// cw_bpf_debug("===== DM-resp DM_LOGIN_ACK dbName[1]:%c",dbNameBuff[1]);
- //// cw_bpf_debug("===== DM-resp DM_LOGIN_ACK dbName[2]:%c",dbNameBuff[2]);
- //// cw_bpf_debug("===== DM-resp DM_LOGIN_ACK dbName[3]:%c",dbNameBuff[3]);
- //// cw_bpf_debug("===== DM-resp DM_LOGIN_ACK dbName[4]:%c",dbNameBuff[4]);
- //// cw_bpf_debug("===== DM-resp DM_LOGIN_ACK dbName[5]:%c",dbNameBuff[5]);
- // cw_bpf_debug("===== DM_LOGIN_ACK dbName:%s",dbNameBuff);
- //
- // __u32 dn_name_size = (__u32)tmp_len + 1 ;
- // COPY_PAYLOAD(dm_ctx->db_name,dn_name_size, dbNameBuff);
- // cw_bpf_debug("===== DM_LOGIN_ACK strcpy(db_name,dbNameBuff):%s",dm_ctx->db_name);
- // bpf_map_update_elem(&active_l7_requests_dm_ctx, &k, dm_ctx, BPF_ANY);
- // }
- // return 0;
- // }
- // __u64 trace_id = get_apm_trace_id(pid, tid);
- // e->trace_id = req->trace_id;
- e->component_sport = conn->sport;
- e->component_dport = conn->dport;
- __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
- __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
- // cw_bpf_debug("[Response][DM] trace_id:%llu", trace_id);
- response = is_dm_response(payload, ret, req->request_type, &dm_ctx->status);
- // cw_bpf_debug("[Response][DM] is_dm_response status ---------- %d",dm_ctx->status);
- if (response) {
- req->ns = dm_ctx->req_start_at_ns;
- e->status = dm_ctx->status;
- e->payload_size = dm_ctx->query_sql_payload_size;
- COPY_PAYLOAD(e->payload, dm_ctx->query_sql_payload_size, dm_ctx->query_sql_payload);
- bpf_map_delete_elem(&active_l7_requests_dm_ctx, &k);
- // cw_bpf_debug("[Response][DM] is response ,delete active_l7_requests_dm_ctx -- req->request_type<0x%x> , e->payload_size:[%d]",req->request_type,e->payload_size);
- }
- // else {
- // cw_bpf_debug("[Response][DM] not response req->request_type <0x%x>",req->request_type);
- // }
- // cw_bpf_debug("[Response][DM] end <---------\n");
- } else if (e->protocol == PROTOCOL_MONGO) {
- response = is_mongo_response(payload, ret, req->partial);
- if (response == 2) { // partial
- struct l7_request *r = bpf_map_lookup_elem(&l7_request_heap, &zero);
- if (!r) {
- return 0;
- }
- r->partial = 1;
- r->protocol = e->protocol;
- r->ns = req->ns;
- r->payload_size = req->payload_size;
- COPY_PAYLOAD(r->payload, req->payload_size, req->payload);
- bpf_map_update_elem(&active_l7_requests, &k, r, BPF_ANY);
- return 0;
- }
- } else if (e->protocol == PROTOCOL_KAFKA) {
- response = is_kafka_response(payload, req->request_id);
- } else if (e->protocol == PROTOCOL_DUBBO2) {
- response = is_dubbo2_response(payload, &e->status);
- }
- if (!response) {
- return 0;
- }
- if (e->trace_id == 0){
- e->trace_id = get_apm_trace_id(pid,tid);
- }
- e->end_at = bpf_ktime_get_ns();
- e->start_at = req->ns;
- e->duration = e->end_at - e->start_at;
- SET_TRACE_METHOD(e);
- send_event(ctx, e, cid, conn);
- return 0;
- }
- static inline __attribute__((__always_inline__))
- int trace_exit_read_tp(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret) {
- return trace_exit_read_common(ctx, id, pid, is_tls, ret, 1); // enable_tail_call = 1
- }
- static inline __attribute__((__always_inline__))
- int trace_exit_read_up(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret) {
- return trace_exit_read_common(ctx, id, pid, is_tls, ret, 0); // enable_tail_call = 0
- }
- SEC("tracepoint/syscalls/sys_enter_write")
- int sys_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size, 0);
- }
- SEC("tracepoint/syscalls/sys_enter_writev")
- int sys_enter_writev(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, 0, ctx->size);
- }
- SEC("tracepoint/syscalls/sys_enter_sendmsg")
- int sys_enter_sendmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- struct l7_user_msghdr msghdr = {};
- if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
- return 0;
- }
- return trace_enter_write(ctx, ctx->fd, 0, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
- }
- //struct cw_mmsghdr {
- // struct user_msghdr msg_hdr;
- // __u32 msg_len;
- //};
- SEC("tracepoint/syscalls/sys_enter_sendmmsg")
- int sys_enter_sendmmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- __u64 offset = 0;
- #pragma unroll
- for (int i = 0; i <= 1; i++) {
- if (i >= ctx->size) {
- break;
- }
- struct mmsghdr h = {};
- if (bpf_probe_read(&h , sizeof(h), (void *)(ctx->buf + offset))) {
- return 0;
- }
- offset += sizeof(h);
- trace_dns_enter_write(ctx, ctx->fd, 0, (char*)h.msg_hdr.msg_iov, 0, h.msg_hdr.msg_iovlen);
- }
- return 0;
- }
- SEC("tracepoint/syscalls/sys_enter_sendto")
- int sys_enter_sendto(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size, 0);
- }
- SEC("tracepoint/syscalls/sys_enter_read")
- int sys_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- __u64 id = bpf_get_current_pid_tgid();
- __u32 pid = id >> 32;
- return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, 0);
- }
- SEC("tracepoint/syscalls/sys_enter_readv")
- int sys_enter_readv(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- __u64 id = bpf_get_current_pid_tgid();
- __u32 pid = id >> 32;
- return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, ctx->size);
- }
- SEC("tracepoint/syscalls/sys_enter_recvmsg")
- int sys_enter_recvmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- __u64 id = bpf_get_current_pid_tgid();
- struct l7_user_msghdr msghdr = {};
- if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
- return 0;
- }
- __u32 pid = id >> 32;
- return trace_enter_read(id, pid, ctx->fd, (char *) msghdr.msg_iov, 0, msghdr.msg_iovlen);
- }
- SEC("tracepoint/syscalls/sys_enter_recvfrom")
- int sys_enter_recvfrom(struct trace_event_raw_sys_enter_rw__stub* ctx) {
- __u64 id = bpf_get_current_pid_tgid();
- __u32 pid = id >> 32;
- return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, 0);
- }
- SEC("tracepoint/syscalls/sys_exit_read")
- int sys_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
- __u64 pid_tgid = bpf_get_current_pid_tgid();
- __u32 pid = pid_tgid >> 32;
- return trace_exit_read_tp(ctx, pid_tgid, pid, 0, ctx->ret);
- }
- SEC("tracepoint/syscalls/sys_exit_readv")
- int sys_exit_readv(struct trace_event_raw_sys_exit_rw__stub* ctx) {
- __u64 pid_tgid = bpf_get_current_pid_tgid();
- __u32 pid = pid_tgid >> 32;
- return trace_exit_read_tp(ctx, pid_tgid, pid, 0, ctx->ret);
- }
- SEC("tracepoint/syscalls/sys_exit_recvmsg")
- int sys_exit_recvmsg(struct trace_event_raw_sys_exit_rw__stub* ctx) {
- __u64 pid_tgid = bpf_get_current_pid_tgid();
- __u32 pid = pid_tgid >> 32;
- return trace_exit_read_tp(ctx, pid_tgid, pid, 0, ctx->ret);
- }
- SEC("tracepoint/syscalls/sys_exit_recvfrom")
- int sys_exit_recvfrom(struct trace_event_raw_sys_exit_rw__stub* ctx) {
- __u64 pid_tgid = bpf_get_current_pid_tgid();
- __u32 pid = pid_tgid >> 32;
- return trace_exit_read_tp(ctx, pid_tgid, pid, 0, ctx->ret);
- }
- //
- //SEC("tracepoint/syscalls/sys_exit_recvfrom")
- //int sys_exit_recvfrom222(struct trace_event_raw_sys_exit_rw__stub* ctx) {
- // __u64 pid_tgid = bpf_get_current_pid_tgid();
- // __u32 pid = pid_tgid >> 32;
- // if (pid == filterPid) {
- // cw_bpf_debug("sys_exit_recvfrom222");
- // }
- // return 0;
- //}
- //bpf_prog_tp__l7_http_trace_id
- PROGTP(l7_http_trace_id)(void * ctx){
- int zero1 = 0;
- struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero1);
- if (!req) {
- return 0;
- }
- // ---------- 在http请求入口生成 横向串联的trace_id start ----------
- __u32 key = 0;
- __u32 offset = has_cw_header(req->payload);
- struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&apm_span_context_heap3, &key);
- if (cw_parent_span_context == NULL) {
- return -1;
- }
- __builtin_memset(cw_parent_span_context, 0, sizeof(struct apm_span_context));
- if (offset > 0) {
- cw_string_to_span_context(&req->payload[offset], cw_parent_span_context);
- } else {
- generate_random_bytes(cw_parent_span_context->trace_id, TRACE_ID_SIZE);
- }
- // 保存 trace_id 到psc
- cw_save_parent_tracking_span(cw_parent_span_context);
- cw_bpf_debug("[Trace Start in l7][HTTP] trace_id:[%llu]\n", cw_parent_span_context->trace_id);
- // ---------- 在http请求入口生成 横向串联的trace_id end ----------
- return 0;
- }
|