Notification-Manager 原码分析(三)消息接收、发送流程分析

notification-manager 原码分析(三)消息接收、发送流程分析

main 启动文件解析

  • 文件路径: cmd/notification-manager/main.go

main主流程

  • 1、相关flag参数解析

    1
    kingpin.Parse()
  • 2、日志级别设置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
    if *logfmt == logFormatJson {
    logger = log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
    }

    switch *logLevel {
    case logLevelDebug:
    logger = level.NewFilter(logger, level.AllowDebug())
    case logLevelInfo:
    logger = level.NewFilter(logger, level.AllowInfo())
    case logLevelWarn:
    logger = level.NewFilter(logger, level.AllowWarn())
    case logLevelError:
    logger = level.NewFilter(logger, level.AllowError())
    default:
    _, _ = fmt.Fprintf(os.Stderr, "log level %v unknown, %v are possible values", *logLevel, logLevels)
    return 1
    }

    logger = log.With(logger, "ts", log.DefaultTimestamp)
    logger = log.With(logger, "caller", log.DefaultCaller)
    _ = level.Info(logger).Log("msg", "Starting notification manager...", "addr", *listenAddress, "timeout", *webhookTimeout)

  • 3、初始化controller并传递context和logger,这里主要是相关k8s资源使用的controller

    1
    2
    3
    4
    5
    6
    7
    8
    // Setup notification manager controller
    var err error
    ctlCtx, cancelCtl := context.WithCancel(context.Background())
    defer cancelCtl()
    if ctl, err = controller.New(ctlCtx, logger); err != nil {
    _ = level.Error(logger).Log("msg", "Failed to create notification manager controller")
    return -1
    }
  • 4、创建infomer watch k8s 资源,这里包含自定义的CRD和template配置及相关secret

    1
    2
    3
    4
    5
    // Sync notification manager config
    if err := ctl.Run(); err != nil {
    _ = level.Error(logger).Log("msg", "Failed to create sync notification manager controller")
    return -1
    }
  • 5、创建存储处理器,管理alert数据,系统默认实现基于memory的channel队列,实现了push/pull/colose方法

    1
    alerts := store.NewAlertStore(*storeType)
  • 6、注册启动http server,wh.New追进去是具体的http路由注册,消息接收的部分再单独解析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // webhook 请求路由注册
    // Setup webhook to receive alert/notification msg
    webhook := wh.New(
    logger,
    ctl,
    alerts,
    &wh.Options{
    ListenAddress: *listenAddress,
    WebhookTimeout: *webhookTimeout,
    WorkerTimeout: *wkrTimeout,
    })

    ctxHttp, cancelHttp := context.WithCancel(context.Background())
    defer cancelHttp()

    srvCh := make(chan error, 1)
    go func() {
    // 启动http服务, 如果退出会往chanel写入close,下边channel会接收处理http服务关闭逻辑
    srvCh <- webhook.Run(ctxHttp)
    }()

  • 7、消息调度: 启动异步任务,轮训告警消息队列请求alert消息,如果拿到就走消息发送流程逻辑

    1
    2
    3
    4
    5
    6
    7
    dispCh := make(chan error, 1)
    // 告警、通知消息调度处理
    disp := dispatcher.New(logger, ctl, alerts, *webhookTimeout, *wkrTimeout, *wkrQueue)
    go func() {
    // 调度器运行: 循环pull消息,拿到就走发送逻辑
    dispCh <- disp.Run()
    }()
  • 8、监听系统信号和上边创建的chanel,处理异常和关闭事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
termCh := make(chan os.Signal, 1)
signal.Notify(termCh, os.Interrupt, syscall.SIGTERM)

for {
select {
case <-termCh:
_ = level.Info(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
cancelHttp()
case err := <-srvCh:
if err != nil {
_ = level.Error(logger).Log("msg", "Abnormal exit", "error", err.Error())
}
_ = alerts.Close()
_ = level.Info(logger).Log("msg", "Store closed")
case <-dispCh:
_ = level.Info(logger).Log("msg", "Dispatcher closed")
return 0
}
}

整体看启动入口逻辑并不复杂。核心两块:一块接收api消息,和另外一块对接收的消息异步队列去处理送消息。

消息接收

  • http路由:

    • 基于main入口webhook := wh.New(追进去可以看到对外提供的api和处理方法
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      h.router.Get("/receivers", h.handler.ListReceivers)
      h.router.Get("/configs", h.handler.ListConfigs)
      h.router.Get("/receiverWithConfig", h.handler.ListReceiverWithConfig)
      h.router.Post("/api/v2/alerts", h.handler.Alert)
      h.router.Post("/api/v2/verify", h.handler.Verify)
      h.router.Post("/api/v2/notifications", h.handler.Notification)
      h.router.Get("/metrics", h.handler.ServeMetrics)
      h.router.Get("/-/reload", h.handler.ServeReload)
      h.router.Get("/-/ready", h.handler.ServeHealthCheck)
      h.router.Get("/-/live", h.handler.ServeReadinessCheck)
      h.router.Get("/status", h.handler.ServeStatus)
  • 接收alert消息Api h.router.Post("/api/v2/alerts", h.handler.Alert) 就是对外主要提供消息发送的webhook Api了,查看实现

消息接收h.handler.Alert实现内容

  • 1、读取request body体,

    1
    2
    3
    4
    5
    6
    b, err := io.ReadAll(r.Body)
    if err != nil {
    fmt.Println(err)
    return
    }

  • 2、数据解析、绑定

    1
    2
    3
    4
    5
    data := template.Data{}
    if err := utils.JsonDecode(r.Body, &data); err != nil {
    h.handle(w, &response{http.StatusBadRequest, err.Error()})
    return
    }
  • 3、接收alert、加工和push队列操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    cluster := h.notifierCtl.GetCluster()
    for _, alert := range data.Alerts {
    if v := alert.Labels["cluster"]; v == "" {
    // 追加cluster label操作,如果有就忽略,没有尝试从集群配置中读取并设置上
    alert.Labels["cluster"] = cluster
    }
    alert.ID = utils.Hash(alert)
    // 通过Provider push 推送alert到store中,至此消息接收的洛基就完成了。
    // dispatcher中轮训d.alerts.Pull拉取数据进行告警发送
    if err := h.alerts.Push(alert); err != nil {
    _ = level.Error(h.logger).Log("msg", "push alert error", "error", err.Error())
    }
    }

  • 4、看下push操作:

    • memProvider系统实现了一个channel,main入口的时候已经初始化了,这里直接调用push就往channel写入alert数据进去了
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      func (p *memProvider) Push(alert *template.Alert) error {
      ctx, cancel := context.WithTimeout(context.Background(), *pushTimeout)
      defer cancel()

      select {
      case p.ch <- alert:
      return nil
      case <-ctx.Done():
      return utils.Error("Time out")
      }
      }

  • 5、返回成功接收提示信息

    1
    h.handle(w, &response{http.StatusOK, "Notification request accepted"})

至此消息接收就完成了。逻辑比较简单,接下来看下如何调度推送消息到reciver的。

消息发送流程逻辑

  • main入口文件dispCh <- disp.Run() 创建异步任务开始执行alert消息队列的调度和处理,追进去看下逻辑。

disp.Run()消息调度处理逻辑

  • Run函数代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    func (d *Dispatcher) Run() error {

    for {
    // err is not nil means the store had closed, dispatcher should process remaining alerts, then exit.
    if alerts, err := d.alerts.Pull(d.notifierCtl.GetBatchMaxSize(), d.notifierCtl.GetBatchMaxWait()); err == nil {
    go d.processAlerts(alerts)
    } else {
    d.processAlerts(alerts)
    return nil
    }
    }
    }
  • 1、轮询拉取alert数据d.alerts.Pull, 追进去能看到实际就是一个chanel等待alert消息,如果有就返回到当前方法了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    func (p *memProvider) Pull(batchSize int, batchWait time.Duration) ([]*template.Alert, error) {

    ctx, cancel := context.WithTimeout(context.Background(), batchWait)
    defer cancel()

    var as []*template.Alert
    for {
    select {
    case <-ctx.Done():
    return as, nil
    case alert := <-p.ch: // 读channel,对应/api/v2/alerts hannder中的push,通信就是通过channel。一端写,一端读
    if alert == nil {
    return as, utils.Error("Store closed")
    }
    as = append(as, alert)
    if len(as) >= batchSize {
    return as, nil
    }
    // 拿到消息后返回
    }
    }
    }
  • 2、processAlerts拿到alert后开始处理发送

processAlerts 处理alert消息逻辑

  • 1、打印日志信息,并且起了一个channel异步等待任务结束,并打印处理时间
1
2
3
4
5
6
_ = level.Debug(d.l).Log("msg", "Dispatcher: Begins to process alerts...", "alerts", len(alerts))

if err := d.getWorker(); err != nil { // 等待d.semCh信号,完成后答应处理总花费时间
return
}
defer d.releaseWorker() // 往<-d.semCh写入一个信号
  • 2、设置超时时间,main启动入口new的时候设置了,默认5s

    1
    2
    3
    4
    5
    6
    d.seq = d.seq + 1
    start := time.Now()
    ctx, cancel := context.WithTimeout(context.Background(), d.wkrTimeout)
    ctx = context.WithValue(ctx, "seq", d.seq)
    defer cancel()

  • 3、异步处理,并阻塞等待结果,到此处理大的逻辑结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
stopCh := make(chan struct{})
go d.worker(ctx, alerts, stopCh)

select {
case <-stopCh:
elapsed := time.Since(start).String()
_ = level.Debug(d.l).Log("msg", "Dispatcher: Processor exit after "+elapsed)
return
case <-ctx.Done():
if err := ctx.Err(); err != nil {
_ = level.Warn(d.l).Log("msg", "Dispatcher: process alerts timeout in "+d.wkrTimeout.String(), "error", err.Error())
}
return
}
  • 3.1、调度worker处理alert消息内容流程 go d.worker(ctx, alerts, stopCh)
    • 3.1.1、worker方法代码。
      • 这里添加了多个stage,对应README.md流程中的stage: silence-> router->filter-> aggregation->notify
      • 添加完stage之后调用pipeline.Exec执行对应stage的Exec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (d *Dispatcher) worker(ctx context.Context, data interface{}, stopCh chan struct{}) {

// 这里对应根目录readMe.md文档中说明的流程顺序(Process):silence-> router->filter-> aggregation->notify
pipeline := stage.MultiStage{}
// Global silence stage
pipeline = append(pipeline, silence.NewStage(d.notifierCtl))
// Route stage
pipeline = append(pipeline, route.NewStage(d.notifierCtl))
// Tenant silence stage
pipeline = append(pipeline, filter.NewStage(d.notifierCtl))
// Aggregation stage
pipeline = append(pipeline, aggregation.NewStage(d.notifierCtl))
// Notify stage
pipeline = append(pipeline, notify.NewStage(d.notifierCtl))

_, output, err := pipeline.Exec(ctx, d.l, data)
if err != nil {
_ = level.Error(d.l).Log("msg", "Dispatcher: process alerts failed", "seq", ctx.Value("seq"))
}

go d.execHistoryStage(ctx.Value("seq"), output) // 这里还有一块历史记录stage,默认alert数据存放的内存,如果需要留底,可以通过history webhook记录。

stopCh <- struct{}{}
}

  • 4、alert消息处理stage pipeline.Exec(ctx, d.l, data)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Exec implements the Stage interface.
func (ms MultiStage) Exec(ctx context.Context, l log.Logger, data interface{}) (context.Context, interface{}, error) {
var err error
for _, s := range ms {
if reflect2.IsNil(data) {
return ctx, nil, nil
}

ctx, data, err = s.Exec(ctx, l, data)
if err != nil {
return ctx, data, err
}
}
return ctx, data, nil
}

  • ctx, data, err = s.Exec(ctx, l, data) 这里调用pipeline 对应的stage具体实现Exec方法, 顺序silence-> router->filter-> aggregation->notify

  • 每个stage实现内容:

    • silence: 如果有静默规则匹配则跳过告警消息; 否则数据追加最终返回
    • router: router的label匹配,租户和接收者匹配,匹配的receiver复制告警消息到receiver,给后续发送使用
    • filter: 消息过滤,如果处于静默的消息、标签匹配的这里会过滤掉
    • aggregation: 聚合
    • notify: 最终具体的消息发送处理逻辑
  • notify stage Exec稍微展开看下, 代码位置: pkg/notify/notify.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    ...
    for k, v := range input {
    receiver := k
    ds := v
    // 定义可以借鉴,直接就获取到了消息类型的数据结构和controller,后边就可以直接调用对应的controller方法,nice
    nf, err := factories[receiver.GetType()](l, receiver, s.notifierCtl)
    if err != nil {
    e := err
    group.Add(func(stopCh chan interface{}) {
    stopCh <- e
    })
    continue
    }
    // 直接调用对应告警消息类型的实现去发送消息了,类型、controller都做了区分,直接调用对应实现逻辑了。
    nf.SetSentSuccessfulHandler(&handler)

    for _, d := range ds {
    alert := d
    group.Add(func(stopCh chan interface{}) {
    // 这里就调实际各自类型receiver接收类型的发送方法了,开始发送消息
    stopCh <- nf.Notify(ctx, alert)
    })
    }
    }
    ...
  • 官方默认实现了如下的notify, 对应里面就是具体发送消息的逻辑,比如email、feishu等。不同的发送渠道参数不一样,所以receiver中对应的接收渠道参数也率有差异

  • 最终反回发送状态和结果。至此发送消息逻辑就结束了。返回到一直循环的方法,不停循环一直监听alert channel,有数据后发送。