Prometheus 实战于源码分析之scrape

编程入门 行业动态 更新时间:2024-10-08 08:27:42

Prometheus <a href=https://www.elefans.com/category/jswz/34/1769775.html style=实战于源码分析之scrape"/>

Prometheus 实战于源码分析之scrape

上一篇介绍了tagert管理和发现discovery,那个这些targets是怎样采集的呢?下面就介绍scrape,它是数据采集器。

type scrapePool struct {appender storage.SampleAppenderctx context.Contextmtx    sync.RWMutexconfig *config.ScrapeConfigclient *http.Client// Targets and loops must always be synchronized to have the same// set of hashes.targets map[uint64]*Targetloops   map[uint64]loop// Constructor for new scrape loops. This is settable for testing convenience.newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop
}

先定义一个scrapePool采集的池,这个里面有targets,所以需要采集的对象。还记得上一篇的TargetManager里面启动discovery里面的targetset

go func(ts *targetSet) {ts.ts.Run(ctx)ts.sp.stop()tm.wg.Done()
}(ts)

具体Run方法discovery/discovery.go

func (ts *TargetSet) Run(ctx context.Context) {
Loop:for {// Throttle syncing to once per five seconds.select {case <-ctx.Done():break Loopcase p := <-ts.providerCh:ts.updateProviders(ctx, p)case <-time.After(5 * time.Second):}select {case <-ctx.Done():break Loopcase <-ts.syncCh:ts.sync()case p := <-ts.providerCh:ts.updateProviders(ctx, p)}}
}

这个里面sync方法

func (ts *TargetSet) sync() {ts.mtx.RLock()var all []*config.TargetGroupfor _, tg := range ts.tgroups {all = append(all, tg)}ts.mtx.RUnlock()ts.syncer.Sync(all)
}

遍历TargetGroup收集所有的target放到all变量里面

func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {start := time.Now()var all []*Targetfor _, tg := range tgs {targets, err := targetsFromGroup(tg, sp.config)if err != nil {log.With("err", err).Error("creating targets failed")continue}all = append(all, targets...)}sp.sync(all)targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(time.Since(start).Seconds(),)targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

具体看sync方法

func (sp *scrapePool) sync(targets []*Target) {sp.mtx.Lock()defer sp.mtx.Unlock()var (uniqueTargets = map[uint64]struct{}{}interval      = time.Duration(sp.config.ScrapeInterval)timeout       = time.Duration(sp.config.ScrapeTimeout))for _, t := range targets {hash := t.hash()uniqueTargets[hash] = struct{}{}if _, ok := sp.targets[hash]; !ok {s := &targetScraper{Target: t, client: sp.client}l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)sp.targets[hash] = tsp.loops[hash] = lgo l.run(interval, timeout, nil)}}var wg sync.WaitGroup// Stop and remove old targets and scraper loops.for hash := range sp.targets {if _, ok := uniqueTargets[hash]; !ok {wg.Add(1)go func(l loop) {l.stop()wg.Done()}(sp.loops[hash])delete(sp.loops, hash)delete(sp.targets, hash)}}wg.Wait()
}

针对每个target启动go l.run(interval, timeout, nil)采集

func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {defer close(sl.done)select {case <-time.After(sl.scraper.offset(interval)):// Continue after a scraping offset.case <-sl.ctx.Done():return}var last time.Timeticker := time.NewTicker(interval)defer ticker.Stop()for {select {case <-sl.ctx.Done():returndefault:}if !sl.appender.NeedsThrottling() {var (start                 = time.Now()scrapeCtx, _          = context.WithTimeout(sl.ctx, timeout)numPostRelabelSamples = 0)// Only record after the first scrape.if !last.IsZero() {targetIntervalLength.WithLabelValues(interval.String()).Observe(time.Since(last).Seconds(),)}samples, err := sl.scraper.scrape(scrapeCtx, start)if err == nil {numPostRelabelSamples, err = sl.append(samples)}if err != nil && errc != nil {errc <- err}sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err)last = start} else {targetSkippedScrapes.Inc()}select {case <-sl.ctx.Done():returncase <-ticker.C:}}
}

通过sl.scraper.scrape采集,这个方法就是通过GET请求去获取

func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) {req, err := http.NewRequest("GET", s.URL().String(), nil)if err != nil {return nil, err}req.Header.Add("Accept", acceptHeader)req.Header.Set("User-Agent", userAgentHeader)resp, err := ctxhttp.Do(ctx, s.client, req)if err != nil {return nil, err}defer resp.Body.Close()if resp.StatusCode != http.StatusOK {return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)}var (allSamples = make(model.Samples, 0, 200)decSamples = make(model.Vector, 0, 50))sdec := expfmt.SampleDecoder{Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),Opts: &expfmt.DecodeOptions{Timestamp: model.TimeFromUnixNano(ts.UnixNano()),},}for {if err = sdec.Decode(&decSamples); err != nil {break}allSamples = append(allSamples, decSamples...)decSamples = decSamples[:0]}if err == io.EOF {// Set err to nil since it is used in the scrape health recording.err = nil}return allSamples, err
}

整个数据采集的流程就是这样。

更多推荐

Prometheus 实战于源码分析之scrape

本文发布于:2024-02-07 06:29:05,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1753849.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:实战   源码   Prometheus   scrape

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!