Kindling
Overview and Concepts
Overview
Trace Profiling
RPC Trace
Roadmap
Prometheus vs. Kindling vs. APM
How Kindling Agent is going to evolve
Installation
Kindling Agent
Requirements
Install Kindling in Kubernetes
Setting up Grafana
FAQ
Download Linux kernel headers
Usage
How to enable Trace Profiling
Prometheus Metrics Description
Use Cases
Service Map and Performance
Observe Java Lock
Developer Guide
Architecture
Build Kindling container image from source codes
Kindling agent-libs 用户态空间数据流
Kindling agent-libs内核态空间数据流
Kindling collector 流水线数据传递流程
Add new eBPF hooks
Kindling probe核心流程
Developer FAQ
Reference
Kindling Java Agent
Overview
Modifications of async-profiler
async-profiler 改造点
Trace Profiling Operation Manual
本文档使用 MrDoc 发布
-
+
home page
Kindling agent-libs 用户态空间数据流
## 数据出口do_inspect() 这里做parse, 从sinsp层获得event ```c captureinfo do_inspect(sinsp* inspector, uint64_t cnt, uint64_t duration_to_tot_ns, bool quiet, bool json, bool do_flush, bool print_progress, sinsp_filter* display_filter, vector<summary_table_entry> &summary_table, sinsp_evt_formatter* formatter) { int32_t res; sinsp_evt* ev; ...... while(1){ res = inspector->next(&ev); ...... if(formatter->tostring(ev, &line)) { // // Output the line // if(display_filter) { if(!display_filter->run(ev)) { continue; } } cout << line << endl; } } } ``` 其中, sinsp_evt在libsinsp/event.h, 继承自gen_event ## libsinsp/sinsp.h sinsp.cpp ```cpp C++ /*!CAP_SUCCESS if the call is successful and pevent and pcpuid contain valid data. SCAP_TIMEOUT in case the read timeout expired and no event is available. SCAP_EOF when the end of an offline capture is reached. On Failure, SCAP_FAILURE is returned and getlasterr() can be used to obtain the cause of the error.*/ virtual int32_t next(OUT sinsp_evt **evt); int32_t sinsp::next(OUT sinsp_evt **puevt) { sinsp_evt* evt; int32_t res; evt = &m_evt; // // Get the event from libscap // res = scap_next(m_h, &(evt->m_pevt), &(evt->m_cpuid)); // // Store a couple of values that we'll need later inside the event. // m_nevts++; evt->m_evtnum = m_nevts; //这里做了一个过滤 m_parser->process_event(evt); // // If needed, dump the event to file // ...... // // Run the analysis engine? 这里没看懂, 似乎没有实现 // sindp.ut.cpp : // void process_event(sinsp_evt* evt, event_return rc) override {} // if (m_external_event_processor) { m_external_event_processor->process_event(evt, libsinsp::EVENT_RETURN_NONE); } // Clean parse related event data after analyzer did its parsing too m_parser->event_cleanup(evt); // // Done // *puevt = evt; return res; } ``` 用来返回event 其中调用了scap库函数 res = scap_next(m_h, &(evt->m_pevt), &(evt->m_cpuid)); ## libscap Scap维护了结构体数组m_devs, 后面看来每一个scap_device对应一个cpu, scap_device里面有一个m_sn_next_event, 它指向下一个事件, 是一个重要的指针. ```cpp typedef struct scap scap_t; struct scap { scap_device* m_devs; ...... } typedef struct scap_device { int m_fd; int m_bufinfo_fd; // used by udig char* m_buffer; uint32_t m_buffer_size; // used by udig uint32_t m_lastreadsize; char* m_sn_next_event; // Pointer to the next event available for scap_next uint32_t m_sn_len; // Number of bytes available in the buffer pointed by m_sn_next_event ...... }scap_device; ``` Captrue层主要包含了scap_next的实现, 其中重要的是live模式下的scap_open_live_int与scap_next_live ```cpp int32_t scap_next(scap_t* handle, OUT scap_evt** pevent, OUT uint16_t* pcpuid) { switch(handle->m_mode) { case SCAP_MODE_LIVE: res = scap_next_live(handle, pevent, pcpuid); break; } // Check to see if the event should be suppressed due // to coming from a supressed tid? 没看懂 啥叫supressed if((res = scap_check_suppressed(handle, *pevent, &suppressed)) != SCAP_SUCCESS) { return res; } return res; } ``` ### scap_open_live_int scap_open_live_int里面分配了一个scap_t的handle, 随后, 分配了device, 并 ```cpp C++ scap_t* scap_open_live_int(char *error, int32_t *rc, proc_entry_callback proc_callback, void* proc_callback_context, bool import_users, const char *bpf_probe, const char **suppressed_comms) { uint32_t j; char filename[SCAP_MAX_PATH_SIZE]; scap_t* handle = NULL; uint32_t ndevs; // // Allocate the handle // handle = (scap_t*) calloc(sizeof(scap_t), 1); // // Preliminary initializations // handle->m_mode = SCAP_MODE_LIVE; handle->m_udig = false; //获取bpf bpf_probe = scap_get_bpf_probe_from_env(); //The number of processors configured handle->m_ncpus = sysconf(_SC_NPROCESSORS_CONF); // // Find out how many devices we have to open, which equals to the number of CPUs // ndevs = sysconf(_SC_NPROCESSORS_ONLN); handle->m_devs = (scap_device*) calloc(sizeof(scap_device), ndevs); for(j = 0; j < ndevs; j++) { handle->m_devs[j].m_buffer = (char*)MAP_FAILED; } handle->m_ndevs = ndevs; // // Extract machine information // ...... // // Create the interface list and user list // ...... // // Open and initialize all the devices // *rc = scap_bpf_load(handle, bpf_probe) for(j = 0; j < handle->m_ndevs; ++j) { // // Additional initializations // handle->m_devs[j].m_lastreadsize = 0;//保证第一次tail的更新 handle->m_devs[j].m_sn_len = 0;//保证了第一次必定进入readbuf scap_stop_dropping_mode(handle); } // // Create the process list // ...... // // Now that sysdig has done all its /proc parsing, start the capture // rc = scap_start_capture(handle) } ``` #### scap_bpf_load ```cpp int32_t scap_bpf_load(scap_t *handle, const char *bpf_probe) { //ToDo: 一堆populate map ...... // // Open and initialize all the devices // online_cpu = 0; for(j = 0; j < handle->m_ncpus; ++j) { int pmu_fd; //ToDo: 检查online CPU ...... pmu_fd = sys_perf_event_open(&attr, -1, j, -1, 0); handle->m_devs[online_cpu].m_fd = pmu_fd; // // Map the ring buffer // handle->m_devs[online_cpu].m_buffer = perf_event_mmap(handle, pmu_fd); } } ``` #### sys_perf_event_open 用来打开perf的fd的函数 ```cpp C++ pmu_fd = sys_perf_event_open(&attr, -1, j, -1, 0); //ToDo: 这一perf与system call static int sys_perf_event_open(struct perf_event_attr *attr, pid_t pid, int cpu, int group_fd, unsigned long flags) { return syscall(__NR_perf_event_open, attr, pid, cpu, group_fd, flags); } ``` #### perf_event_mmap ```cpp // 将perf的fd映射为缓冲区 // 映射了两个副本, 但是空间对不上 很奇怪 // ToDo: // 1. 为什么要两个副本 // 2. 为什么空间对不上 // // Update: perf的fd是带header的, 这里p1只需要指向数据区就行了, 所以不需要头文件的空间, // 或许把p1改成从fd偏移一个头文件的fd也能说得通? // void *p1 = mmap(tmp + ring_size + header_size, ring_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, fd, 4096); // 两个副本或许是为了处理边界的时候更加简单? 这个作用肯定是有的(scap_bpf.h 141), 但是为什么要这么大一块内存, 只是为了处理边界? // 反正我看到能够溢出第一副本的地方就这里一小块 static void *perf_event_mmap(scap_t *handle, int fd) { int page_size = getpagesize(); int ring_size = page_size * BUF_SIZE_PAGES; int header_size = page_size; int total_size = ring_size * 2 + header_size; void *tmp = mmap(NULL, total_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); // Map the second copy to allow us to handle the wrap case normally // 映射第二个副本,以使我们能够正常处理包装箱 --google translate void *p1 = mmap(tmp + ring_size, ring_size + header_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, fd, 0); // Map the main copy void *p2 = mmap(tmp, ring_size + header_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, fd, 0); return tmp } ```  #### bpf_perf_event_output > **long bpf_perf_event_output(void *_ctx_, struct bpf_map *_map_, u64 _flags_,void *_data_, u64 _size_)** > **Description** > Write raw _data_ blob into a special BPF perf event held by _map_ of type **BPF_MAP_TYPE_PERF_EVENT_ARRAY**. This perf event must have the following attributes: **PERF_SAMPLE_RAW **as **sample_type**, **PERF_TYPE_SOFTWARE **as **type**, and **PERF_COUNT_SW_BPF_OUTPUT **as **config**. > The _flags_ are used to indicate the index in _map_ for which the value must be put, masked with **BPF_F_INDEX_MASK**. Alternatively, _flags_ can be set to **BPF_F_CURRENT_CPU **to indicate that the index of the current CPU core should be used. > The value to write, of _size_, is passed through eBPF stack and pointed by _data_. > The context of the program _ctx_ needs also be passed to the helper. > On user space, a program willing to read the values needs to call **perf_event_open**() on the perf event (either for one or for all CPUs) and to store the file descriptor into the _map_. This must be done before the eBPF program can send data into it. An example is available in file _samples/bpf/trace_output_user.c_ in the Linux kernel source tree (the eBPF program counterpart is in _samples/bpf/trace_output_kern.c_). > **bpf_perf_event_output**() achieves better performance > than **bpf_trace_printk**() for sharing data with user > space, and is much better suitable for streaming data > from eBPF programs. > Note that this helper is not restricted to tracing use > cases and can be used with programs attached to TC or > XDP as well, where it allows for passing data to user > space listeners. Data can be: > • Only custom structs, > • Only the packet payload, or > • A combination of both. > **Return **0 on success, or a negative error in case of failure. ### scap_next_live ```cpp static inline int32_t scap_next_live(scap_t* handle, OUT scap_evt** pevent, OUT uint16_t* pcpuid) { uint32_t j; scap_evt* pe = NULL; uint32_t ndevs = handle->m_ndevs; for(j = 0; j < ndevs; j++) { scap_device* dev = &(handle->m_devs[j]);//获取device //更新tail 一般是因为事件用完了 (dev->m_sn_len == 0) { // // If we don't have data from this ring, but we are // still occupying, free the resources for the // producer rather than sitting on them. // if(dev->m_lastreadsize > 0) { scap_advance_tail(handle, j); } continue; } //根据不同的probe进行不同event获取方式, 这里的scap_bpf_evt_from_perf_sample似乎也只是简单地解码为scap_evt if(handle->m_bpf) { pe = scap_bpf_evt_from_perf_sample(dev->m_sn_next_event); } else { pe = (scap_evt *) dev->m_sn_next_event; } *pevent = pe; *pcpuid = j; } //随后更新next的内容 // // Update the pointers. // if(handle->m_bpf) { scap_bpf_advance_to_evt(handle, *pcpuid, true, dev->m_sn_next_event, &dev->m_sn_next_event, &dev->m_sn_len); } else { ASSERT(dev->m_sn_len >= (*pevent)->len); dev->m_sn_len -= (*pevent)->len; dev->m_sn_next_event += (*pevent)->len; } } ``` ~~buffer内的内容来自于:~~ ```cpp // All the buffers have been consumed. Check if there's enough data to keep going or // if we should wait. // int32_t refill_read_buffers(scap_t* handle) { for(j = 0; j < ndevs; j++) { struct scap_device *dev = &(handle->m_devs[j]); int32_t res = scap_readbuf(handle, j, &dev->m_sn_next_event, &dev->m_sn_len); ...... } } ``` #### m_sn_next_event ```cpp // // The device descriptor, 用来描述ring buffer // typedef struct scap_device { int m_fd; int m_bufinfo_fd; // used by udig char* m_buffer; uint32_t m_buffer_size; // used by udig uint32_t m_lastreadsize; char* m_sn_next_event; // Pointer to the next event available for scap_next uint32_t m_sn_len; // Number of bytes available in the buffer pointed by m_sn_next_event ...... }scap_device; ``` 有以下几个地方引用了这一指针, 其中1与2都用到了scap_bpf_advance_to_evt scap_bpf_advance_to_evt ```cpp // // Update the pointers. // if(handle->m_bpf) { scap_bpf_advance_to_evt(handle, *pcpuid, true, dev->m_sn_next_event, &dev->m_sn_next_event, &dev->m_sn_len); } static inline int32_t scap_bpf_advance_to_evt(scap_t *handle, uint16_t cpuid, bool skip_current, char *cur_evt, char **next_evt, uint32_t *len) { struct scap_device *dev; void *base; void *begin; dev = &handle->m_devs[cpuid]; struct perf_event_mmap_page *header = (struct perf_event_mmap_page *) dev->m_buffer; base = ((char *) header) + header->data_offset;//buffer的指针 begin = cur_evt;//ring buffer的tail位 //当前buffer剩余的长度 while(*len) { struct perf_event_header *e = begin; ......//一堆长度判断与安全检查, 防止越界 if(skip_current) { skip_current = false; } else { *next_evt = (char *) e; break; } //以下开始更新事件指针 //指针超过了环的地址 if(begin + e->size > base + header->data_size) { begin = begin + e->size - header->data_size; } //指针恰好指向环的末尾 else if(begin + e->size == base + header->data_size) { begin = base; } else { //根据event的size直接位移 begin += e->size; } *len -= e->size; } } ```  可以看出 head与tail是完全可以超出data_size的, 所以他们应用的时候需要模data_size 而begin则是在base(header+data_offset)与base+data_size之间循环指 #### readbuf 因为m_sn_len初始为0, 所以一定会触发一次refill, 这样一样m_sn_next_event就被初始化了 初始化m_sn_next_event用的也是1中提到的scap_bpf_advance_to_evt函数来Update ```cpp scap_readbuf -> scap_bpf_readbuf //Refill our data for each of the devices int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_t* len) { uint32_t thead; uint32_t ttail; uint64_t read_size; if(handle->m_bpf) { return scap_bpf_readbuf(handle, cpuid, buf, len); } //以下为非bpf功能, 暂时不管 ...... } static inline int32_t scap_bpf_readbuf(scap_t *handle, uint32_t cpuid, char **buf, uint32_t *len) { struct perf_event_mmap_page *header; struct scap_device *dev; uint64_t tail; uint64_t head; uint64_t read_size; char *p; dev = &handle->m_devs[cpuid]; //perf_event_mmap_page is a structure of the page that can be mapped via mmap header = (struct perf_event_mmap_page *) dev->m_buffer; scap_bpf_get_buf_pointers((char *) header, &head, &tail, &read_size); dev->m_lastreadsize = read_size;//后续更新tail有用 //current event p = ((char *) header) + header->data_offset + tail % header->data_size; *len = read_size; return scap_bpf_advance_to_evt(handle, cpuid, false, p, buf, len); } static inline void scap_bpf_get_buf_pointers(char *buf, uint64_t *phead, uint64_t *ptail, uint64_t *pread_size) { struct perf_event_mmap_page *header; uint64_t begin; uint64_t end; header = (struct perf_event_mmap_page *) buf; *phead = header->data_head; *ptail = header->data_tail; //这里因为内存映射, 内存发生了改变 // clang-format off asm volatile("" ::: "memory"); // clang-format on begin = *ptail % header->data_size; end = *phead % header->data_size; //ring buffer的数据结构使然 if(begin > end) { *pread_size = header->data_size - begin + end; } else { *pread_size = end - begin; } } ``` #### scap_bpf_evt_from_perf_sample ```cpp pe = scap_bpf_evt_from_perf_sample(dev->m_sn_next_event); static inline scap_evt *scap_bpf_evt_from_perf_sample(void *evt) { struct perf_event_sample *perf_evt = (struct perf_event_sample *) evt; ASSERT(perf_evt->header.type == PERF_RECORD_SAMPLE); return (scap_evt *) perf_evt->data; } struct perf_event_sample { struct perf_event_header header; uint32_t size; char data[]; }; struct scap_evt { uint64_t ts; /* timestamp, in nanoseconds from epoch */ uint64_t tid; /* the tid of the thread that generated this event */ uint32_t len; /* the event len, including the header */ uint16_t type; /* the event type */ uint32_t nparams; /* the number of parameters of the event */ }; ``` #### perf_event_mmap_page ```cpp /* * Structure of the page that can be mapped via mmap */ struct perf_event_mmap_page { /* * Control data for the mmap() data buffer. * * User-space reading the @data_head value should issue an smp_rmb(), * after reading this value. * * When the mapping is PROT_WRITE the @data_tail value should be * written by userspace to reflect the last read data, after issuing * an smp_mb() to separate the data read from the ->data_tail store. * In this case the kernel will not over-write unread data. * * See perf_output_put_handle() for the data ordering. * * data_{offset,size} indicate the location and size of the perf record * buffer within the mmapped area. */ __u64 data_head; /* head in the data section */ __u64 data_tail; /* user-space written tail */ __u64 data_offset; /* where the buffer starts */ __u64 data_size; /* data buffer size */ ...... } ``` buffer的控制变量, offset指向整个buffer的开头, size是整个perf数据区的大小, head与tail是用来标示环的数据读取情况, 其中tail由userspace控制, 防止内核将还没来得及读的变量覆写. 这里提到在读data_head, 写data_tail之后需要显式调用smp_mb, 我理解应该是为了防止运行时从缓存读取这一能够被改写的值, 或者防止编译优化, 比如GCC直接用了一个缓存变量之类的. 为什么要用head与tail框一段buffer出来? 防止把系统的event读完了, 越界读取 防止正在读取的部分被kernel改写? 存疑 ## driver/bpf 这里没有对perf的buffer进行解析, 只是简单地将ppm_evt_hdr类型output到了perf_event里面 ToDo: kernel部分如何实现perf? ```c static __always_inline int bpf_##x(void *ctx) \ { \ struct filler_data data; \ int res; \ \ res = init_filler_data(ctx, &data, is_syscall); \ if (res == PPM_SUCCESS) { \ if (!data.state->tail_ctx.len) \ write_evt_hdr(&data); \ res = __bpf_##x(&data); \ } \ \ if (res == PPM_SUCCESS) \ res = push_evt_frame(ctx, &data); \ \ if (data.state) \ data.state->tail_ctx.prev_res = res; \ \ bpf_tail_call(ctx, &tail_map, PPM_FILLER_terminate_filler); \ bpf_printk("Can't tail call terminate filler\n"); \ return 0; \ } ``` **Ref** [Sysdig vs DTrace vs Strace: A technical discussion.](https://sysdig.com/blog/sysdig-vs-dtrace-vs-strace-a-technical-discussion/) [mmap(2)](https://man7.org/linux/man-pages/man2/mmap.2.html) [内存屏障](https://zhuanlan.zhihu.com/p/31892309) ## 代码结构简析 在agent-libs中,按照数据的空间位置我们可以将其分为内核空间数据流和用户空间数据流两个部分。 - 内核空间数据流位于driver目录中,其中定义了BPF程序,并根据上层用户订阅规则将订阅的BPF程序挂载到相应hook点上,当hook点被触发时,通过回调机制调用载入内核的BPF程序,BPF程序会在该上下文环境中获取所需的数据,并将其传送至perf buffer中。 - 用户空间数据流位于userspace目录中,其主要的数据流向是从perf buffer获取数据到scap层,scap将数据进行处理封装传至sinsp层中。(**注**:上层Kindling Event的数据输入就是sinsp类型)  ## Perf Buffer介绍 BPF 程序需要**将数据发送到用户空间**(userspace), BPF perf buffer(perfbuf)是目前这一过程的事实标准。**perfbuf 是 per-CPU 环形缓冲区**(circular buffers),针对每个CPU都分配一个独立的缓存区。 BPF 程序使用 perfbuf 时,必须先初始化一份事件数据,然后将它复制到 perfbuf, 然后才能发送到用户空间。这意味着**数据会被复制两次**: - 第一次:复制到一个**局部变量**(ebpf verify限制栈不能超过512字节)或 **per-CPU array** (为避免超过512字节栈空间限制,我们可以在堆上申请)中; - 第二次:复制到 **perfbuf** 中(如果perf buffer空间不足则会失败)。 在BPF程序的C态代码中,需要通过`bpf_perf_event_output(context, &perfbuf, flag, event, sizeof(event))`来向用户态发送数据。 **注**:在内核5.8之后,eBPF开始支持更加高效的Ring Buffer,Ring Buffer不局部于per-cpu,而且BPF程序可以直接在Ring Buffer中申请空间,这样只会有一次拷贝,且避免了Perf Buffer因空间不足浪费一次拷贝的情况。在agent-libs中,出于兼容性等方面考虑,目前实现采用的是Perf Buffer。 ## Probe->Perf Buffer 在预定义的probe被触发时,会通过BPF尾调用机制来调用相应的FILLER函数进行处理,FILLER()主要对数据载体`data->buf`进行了事件信息填充,事件信息可分为两部分: - `write_evt_hdr()`:填充事件头(ppm_evt_hdr结构)信息,包括时间戳、TID、事件类型等 ```c // 1. ppm_evt_hdr结构 struct ppm_evt_hdr { #ifdef PPM_ENABLE_SENTINEL uint32_t sentinel_begin; #endif uint64_t ts; /* timestamp, in nanoseconds from epoch */ uint64_t tid; /* the tid of the thread that generated this event */ uint32_t len; /* the event len, including the header */ uint16_t type; /* the event type */ uint32_t nparams; /* the number of parameters of the event */ }; // 2. write_evt_hdr函数 static __always_inline void write_evt_hdr(struct filler_data *data) { ... struct ppm_evt_hdr *evt_hdr = (struct ppm_evt_hdr *)data->buf; ... } ``` - `__bpf_##x()`:事件的具体信息,如具体参数值等,以`ppm_param_info`的形式进行填充。在Filler probe中,通过`bpf_val_to_ring(data, val)`调用`__bpf_val_to_ring`,再通过`bpf_probe_read_str`函数,将val先复制一份到data的buf部分,之后再通过上文所述的`bpf_perf_event_output`将`data->buf` push到perf buffer中。 ```c //FILLER的宏定义 #define FILLER(x, is_syscall) \ static __always_inline int __bpf_##x(struct filler_data *data); \ \ __bpf_section(TP_NAME "filler/" #x) \ static __always_inline int bpf_##x(void *ctx) \ { \ struct filler_data data; \ int res; \ \ res = init_filler_data(ctx, &data, is_syscall); \ if (res == PPM_SUCCESS) { \ if (!data.state->tail_ctx.len) \ write_evt_hdr(&data); \ res = __bpf_##x(&data); \ } \ \ if (res == PPM_SUCCESS) \ res = push_evt_frame(ctx, &data); \ \ if (data.state) \ data.state->tail_ctx.prev_res = res; \ \ bpf_tail_call(ctx, &tail_map, PPM_FILLER_terminate_filler); \ bpf_printk("Can't tail call terminate filler\n"); \ return 0; \ } \ \ static __always_inline int __bpf_##x(struct filler_data *data) \ ``` `fillers.h`中FILLER()里面编写的代码实际上就是__bpf_##x()中的处理逻辑,其中包含了具体参数信息的填充 之后,在`push_evt_frame`函数中,通过BPF辅助函数`**bpf_perf_event_output**`,将data->buf推送至perf buffer中(perf _map,则是一个类型为`BPF_MAP_TYPE_PERF_EVENT_ARRAY`的BPF MAP) ```c //push_evt_frame函数 static __always_inline int push_evt_frame(void *ctx, struct filler_data *data) { ... #ifdef BPF_FORBIDS_ZERO_ACCESS int res = bpf_perf_event_output(ctx, &perf_map, BPF_F_CURRENT_CPU, data->buf, ((data->state->tail_ctx.len - 1) & SCRATCH_SIZE_MAX) + 1); #else int res = bpf_perf_event_output(ctx, &perf_map, BPF_F_CURRENT_CPU, data->buf, data->state->tail_ctx.len & SCRATCH_SIZE_MAX); #endif ... return PPM_SUCCESS; } //perf_map struct bpf_map_def __bpf_section("maps") perf_map = { .type = BPF_MAP_TYPE_PERF_EVENT_ARRAY, .key_size = sizeof(u32), .value_size = sizeof(u32), .max_entries = 0, }; ``` 完成后,内核里data数据已经存储在了perf buffer中。 ## Scap层对于perf Buffer的获取(perf buffer -> 用户态) Scap层围绕`scap_t`结构,作为handle句柄,存储基本信息,句柄这其中也维护了结构体数组`scap_device`类型的`m_devs`, 而每一个`scap_device`对应一个cpu, `scap_device`里面有一个`m_sn_next_event`, 它指向下一个buffer中的事件, 是一个重要的指针。 ```c // 1.typedef struct scap scap_t struct scap { ... scap_mode_t m_mode; scap_device* m_devs; uint32_t m_ndevs; ... } // 2.scap_device typedef struct scap_device { int m_fd; int m_bufinfo_fd; // used by udig char* m_buffer;//重要字段 uint32_t m_buffer_size; // used by udig uint32_t m_lastreadsize; char* m_sn_next_event; // Pointer to the next event available for scap_next 重要字段 uint32_t m_sn_len; // Number of bytes available in the buffer pointed by m_sn_next_event union { // Anonymous struct with ppm stuff struct { struct ppm_ring_buffer_info* m_bufinfo; struct udig_ring_buffer_status* m_bufstatus; // used by udig }; // Anonymous struct with bpf stuff struct { uint64_t m_evt_lost; }; }; }scap_device; ``` `**scap_bpf_advance_to_evt**`**函数是将perf buffer中的数据取出来的关键函数**,会被`scap_next_live`在bpf程序判定下直接调用,或通过`refill_read_buffers->scap_readbuf->scap_bpf_readbuf->scap_bpf_readbuf`递进调用。 在`scap_bpf_advance_to_evt`中,通过`***next_evt = (char *) e**`**,将起始buffer地址传递给next_evt变量**(而这一参数在调用时往往代表的是`dev->m_sn_next_event`),并完成赋值后的环形指针逻辑处理。 完成后,内核中perf buffer中的数据已经成功加载至用户态的`dev->m_sn_next_event`中。 ```c // 1.scap_bpf_advance_to_evt static inline int32_t scap_bpf_advance_to_evt(scap_t *handle, uint16_t cpuid, bool skip_current, char *cur_evt, char **next_evt, uint32_t *len) { struct scap_device *dev; void *base; void *begin; dev = &handle->m_devs[cpuid]; struct perf_event_mmap_page *header = (struct perf_event_mmap_page *) dev->m_buffer; base = ((char *) header) + header->data_offset;//buffer的指针 begin = cur_evt;//buffer的起始位 //当前buffer剩余的长度 while(*len) { struct perf_event_header *e = begin; ... if(skip_current) { skip_current = false; } else { *next_evt = (char *) e; //通过这里,为evt赋值 break; } } //代表是环形的 if(begin + e->size > base + header->data_size) { begin = begin + e->size - header->data_size; } else if(begin + e->size == base + header->data_size) { begin = base; } else { begin += e->size; } *len -= e->size; } return SCAP_SUCCESS; } // 2.调用方式1 static inline int32_t scap_next_live(scap_t* handle, OUT scap_evt** pevent, OUT uint16_t* pcpuid) if(*pcpuid != 65535) { struct scap_device *dev = &handle->m_devs[*pcpuid]; if(handle->m_bpf) { #ifndef _WIN32 scap_bpf_advance_to_evt(handle, *pcpuid, true, //在这里 dev->m_sn_next_event, &dev->m_sn_next_event, &dev->m_sn_len); #endif } } // 3.调用方式2 static inline int32_t scap_next_live(scap_t* handle, OUT scap_evt** pevent, OUT uint16_t* pcpuid { if(*pcpuid != 65535) {} else { return refill_read_buffers(handle);//step a } } int32_t refill_read_buffers(scap_t* handle) { for(j = 0; j < ndevs; j++) { struct scap_device *dev = &(handle->m_devs[j]); int32_t res = scap_readbuf(handle, //step b j, &dev->m_sn_next_event, &dev->m_sn_len); } return SCAP_TIMEOUT; } int32_t scap_readbuf(scap_t* handle, uint32_t cpuid, OUT char** buf, OUT uint32_t* len) { if(handle->m_bpf) { return scap_bpf_readbuf(handle, cpuid, buf, len); //step c } return SCAP_SUCCESS; } static inline int32_t scap_bpf_readbuf(scap_t *handle, uint32_t cpuid, char **buf, uint32_t *len) { return scap_bpf_advance_to_evt(handle, cpuid, false, p, buf, len); //step d } ```
xieyun
June 13, 2022, 5:15 p.m.
Share documents
Collection documents
Last
Next
Scan wechat
Copy link
Scan your mobile phone to share
Copy link
关于 MrDoc
觅思文档MrDoc
是
州的先生
开发并开源的在线文档系统,其适合作为个人和小型团队的云笔记、文档和知识库管理工具。
如果觅思文档给你或你的团队带来了帮助,欢迎对作者进行一些打赏捐助,这将有力支持作者持续投入精力更新和维护觅思文档,感谢你的捐助!
>>>捐助鸣谢列表
微信
支付宝
QQ
PayPal
Markdown文件
share
link
type
password
Update password