|
@@ -22,7 +22,12 @@
|
|
|
// limitation on map entry size: https://github.com/iovisor/bcc/issues/2519#issuecomment-534359316
|
|
// limitation on map entry size: https://github.com/iovisor/bcc/issues/2519#issuecomment-534359316
|
|
|
// the default value is 100, but it can be changed by the user
|
|
// the default value is 100, but it can be changed by the user
|
|
|
// we must specify a limit for the verifier
|
|
// we must specify a limit for the verifier
|
|
|
|
|
+#if __KERNEL_FROM >= 512
|
|
|
#define MAX_BATCH_SIZE 10
|
|
#define MAX_BATCH_SIZE 10
|
|
|
|
|
+#else
|
|
|
|
|
+#define MAX_BATCH_SIZE 1
|
|
|
|
|
+#endif
|
|
|
|
|
+
|
|
|
// https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/common/Topic.scala#L30C3-L30C34
|
|
// https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/common/Topic.scala#L30C3-L30C34
|
|
|
#define MAX_TOPIC_SIZE 256
|
|
#define MAX_TOPIC_SIZE 256
|
|
|
// No constraint on the key size, but we must have a limit for the verifier
|
|
// No constraint on the key size, but we must have a limit for the verifier
|
|
@@ -134,7 +139,6 @@ struct tcp_addr {
|
|
|
// 忽略 Zone
|
|
// 忽略 Zone
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-#ifndef NO_HEADER_PROPAGATION
|
|
|
|
|
|
|
|
|
|
static __always_inline int build_contxet_header(struct kafka_header_t *header, struct apm_span_context *span_ctx) {
|
|
static __always_inline int build_contxet_header(struct kafka_header_t *header, struct apm_span_context *span_ctx) {
|
|
|
if (header == NULL || span_ctx == NULL) {
|
|
if (header == NULL || span_ctx == NULL) {
|
|
@@ -215,11 +219,11 @@ static __always_inline int build_contxet_header(struct kafka_header_t *header, s
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static __always_inline int inject_kafka_header(void *message, struct kafka_header_t *header, u64 msg_headers_pos) {
|
|
static __always_inline int inject_kafka_header(void *message, struct kafka_header_t *header, u64 msg_headers_pos) {
|
|
|
- append_item_to_slice(header, sizeof(*header), (void *) (message + msg_headers_pos));
|
|
|
|
|
|
|
+// append_item_to_slice(header, sizeof(*header), (void *) (message + msg_headers_pos));
|
|
|
|
|
+ cw_append_item_to_slice(header, sizeof(*header), (void *) (message + msg_headers_pos));
|
|
|
return 0;
|
|
return 0;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-#endif
|
|
|
|
|
|
|
|
|
|
static __always_inline long collect_kafka_attributes(void *message, struct message_attributes_t *attrs, bool collect_topic, u64 msg_key_pos, u64 msg_topic_pos) {
|
|
static __always_inline long collect_kafka_attributes(void *message, struct message_attributes_t *attrs, bool collect_topic, u64 msg_key_pos, u64 msg_topic_pos) {
|
|
|
if (collect_topic) {
|
|
if (collect_topic) {
|
|
@@ -417,7 +421,6 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
|
|
|
__builtin_memcpy(kafka_request->msgs[i].sc.assumed_app_id, kafka_request->msgs[0].sc.assumed_app_id, APM_APP_ID_SIZE);
|
|
__builtin_memcpy(kafka_request->msgs[i].sc.assumed_app_id, kafka_request->msgs[0].sc.assumed_app_id, APM_APP_ID_SIZE);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-#ifndef NO_HEADER_PROPAGATION
|
|
|
|
|
// Build the header
|
|
// Build the header
|
|
|
if (build_contxet_header(header, &kafka_request->msgs[i].sc) != 0) {
|
|
if (build_contxet_header(header, &kafka_request->msgs[i].sc) != 0) {
|
|
|
cw_bpf_debug("uprobe/WriteMessages: Failed to build header");
|
|
cw_bpf_debug("uprobe/WriteMessages: Failed to build header");
|
|
@@ -427,7 +430,6 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
|
|
|
inject_kafka_header(msg_ptr, header, proc_info->kafka_message_headers_pos);
|
|
inject_kafka_header(msg_ptr, header, proc_info->kafka_message_headers_pos);
|
|
|
// Zero the header for next iteration to avoid stale data
|
|
// Zero the header for next iteration to avoid stale data
|
|
|
__builtin_memset(header, 0, sizeof(*header));
|
|
__builtin_memset(header, 0, sizeof(*header));
|
|
|
-#endif
|
|
|
|
|
kafka_request->valid_messages++;
|
|
kafka_request->valid_messages++;
|
|
|
msg_ptr = msg_ptr + msg_size;
|
|
msg_ptr = msg_ptr + msg_size;
|
|
|
}
|
|
}
|
|
@@ -547,4 +549,4 @@ int uprobe_WriteMessages_Returns(struct pt_regs *ctx) {
|
|
|
bpf_map_delete_elem(&kafka_events, &key);
|
|
bpf_map_delete_elem(&kafka_events, &key);
|
|
|
// don't need to stop tracking the span, as we don't have a context to propagate locally
|
|
// don't need to stop tracking the span, as we don't have a context to propagate locally
|
|
|
return 0;
|
|
return 0;
|
|
|
-}
|
|
|
|
|
|
|
+}
|