Drone原码分析(二)任务调度执行流程

drone原码分析(二)任务调度执行流程

注册runner

  • main入口启动文件: cmd/drone-server/main.go

  • 1、注册启动配置: 这里有很多服务参数启动注册,这里只关注runner的部分,进入InitializeApplication后就能看到runner

    1
    2
    3
    4
    5
    6
    app, err := InitializeApplication(config)
    if err != nil {
    logger := logrus.WithError(err)
    logger.Fatalln("main: cannot initialize server")
    }

  • 2、runner配置

    1
    runner := provideRunner(buildManager, secretService, registryService, config2)
  • 3、应用配置中注入runner

    1
    mainApplication := newApplication(cronScheduler, reaper, datadog, runner, serverServer, userStore)

drone提供多种runner:docker、k8s等。runner仓库地址: https://github.com/drone-runners

  • 4、main入口文件中启动server

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    g := errgroup.Group{}
    g.Go(func() error {
    logrus.WithFields(
    logrus.Fields{
    "proto": config.Server.Proto,
    "host": config.Server.Host,
    "port": config.Server.Port,
    "url": config.Server.Addr,
    "acme": config.Server.Acme,
    },
    ).Infoln("starting the http server")
    return app.server.ListenAndServe(ctx)
    })


  • 5、启动runner: 如果单独配置了runner则不会启动这里的,比如配置了drone k8s runner则会使用k8s runner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // launches the build runner in a goroutine. If the local
    // runner is disabled (because nomad or kubernetes is enabled)
    // then the goroutine exits immediately without error.
    g.Go(func() (err error) {
    if app.runner == nil {
    return nil
    }
    logrus.WithField("threads", config.Runner.Capacity).
    Infoln("main: starting the local build runner")
    return app.runner.Start(ctx, config.Runner.Capacity)
    })


  • runner扩展帮助项目: https://github.com/drone/runner-go

  • k8s runner: https://github.com/drone-runners/drone-runner-kube

  • k8s runner为例: 文件command/command.go 注册runner/ 启动runner/

    • 1、注册配置
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      // Command parses the command line arguments and then executes a
      // subcommand program.
      func Command() {
      app := kingpin.New("drone", "drone kubernetes runner")
      registerCompile(app)
      registerExec(app)
      daemon.Register(app)

      kingpin.Version(version)
      kingpin.MustParse(app.Parse(os.Args[1:]))
      }
    • 2、启动runner
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      // Register the daemon command.
      func Register(app *kingpin.Application) {
      c := new(daemonCommand)

      cmd := app.Command("daemon", "starts the runner daemon").
      Default().
      Action(c.run)

      cmd.Arg("envfile", "load the environment variable file").
      Default("").
      StringVar(&c.envfile)
      }

  • 3、command/daemon/daemon.go中有定义runner任务拉取和server启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var g errgroup.Group
server := server.Server{
Addr: config.Server.Port,
Handler: router.New(tracer, hook, router.Config{
Username: config.Dashboard.Username,
Password: config.Dashboard.Password,
Realm: config.Dashboard.Realm,
}),
}

logrus.WithField("addr", config.Server.Port).
Infoln("starting the server")

g.Go(func() error {
return server.ListenAndServe(ctx)
})


1
2
3
4
5
6
7
8
9
10
11
g.Go(func() error {
logrus.WithField("capacity", config.Runner.Capacity).
WithField("endpoint", config.Client.Address).
WithField("kind", resource.Kind).
WithField("type", resource.Type).
Infoln("polling the remote server")

poller.Poll(ctx, config.Runner.Capacity)
return nil
})

  • 总结:
    • 如果不配置runner,默认server中会启动一个go协成进行任务拉取、调度执行,默认以docker方式运行。
    • 若单独定义了runner,如k8s则不会使用默认的go协成。会通过定义的runner是自行维护调度worker。
    • 用户可以基于runner-go自行实现runner
    • runner所需要做的事情:与server交互通过令牌注册,watch server pendding的任务,任务、平台等参数符合有拉取到任务后执行调度创建资源、执行任务运行。当然也换有其他许多事项,比如任务实时日志等。可以基于k8s runner和runner-go参考实现自己需要的runner。

任务调度

  • 参见drone原码分析(一)任务创建流程中的任务创建t.sched.Schedule(ctx, stage),当pengding的stage任务出现时,server push到队列中。
  • 接着已经启动的runner,通过poller.Poll(ctx, config.Runner.Capacity) 请求server队列,通过rpc拉取任务。拉取到任务之后就是runner调度执行的逻辑。
  • 这里接着drone项目代码中的逻辑进行分析。其他runner大致相似,都是实现自己的runner去创建不同的资源去执行任务。

runner启动、拉取任务

  • 1、main入口启动runner。 主入口main文件cmd/drone-server/main.go

    1
    2
    3
    4
    5
    6
    7
    8
    g.Go(func() (err error) {
    if app.runner == nil {
    return nil
    }
    logrus.WithField("threads", config.Runner.Capacity).
    Infoln("main: starting the local build runner")
    return app.runner.Start(ctx, config.Runner.Capacity)
    })
  • 2、根据配置参数启动制定数量的runner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // Start starts N build runner processes. Each process polls
    // the server for pending builds to execute.
    func (r *Runner) Start(ctx context.Context, n int) error {
    var g errgroup.Group
    for i := 0; i < n; i++ {
    g.Go(func() error {
    return r.start(ctx)
    })
    }
    return g.Wait()
    }

  • 3、守护拉取任务r.poll(ctx)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func (r *Runner) start(ctx context.Context) error {
    for {
    select {
    case <-ctx.Done():
    return nil
    default:
    // This error is ignored on purpose. The system
    // should not exit the runner on error. The run
    // function logs all errors, which should be enough
    // to surface potential issues to an administrator.
    r.poll(ctx)
    }
    }
    }

  • 4、poll函数拉取任务逻辑

  • 4.1 请求当前runner可以执行的任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    p, err := r.Manager.Request(ctx, &manager.Request{
    Kind: "pipeline",
    Type: "docker",
    OS: r.OS,
    Arch: r.Arch,
    Kernel: r.Kernel,
    Variant: r.Variant,
    Labels: r.Labels,
    })
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("runner: cannot get queue item")
    return err
    }
    if p == nil || p.ID == 0 {
    return nil
    }

  • 4.2枷锁确认当前任务

1
2
3
4
5
6
7
8
9
10
11
12
13
_, err = r.Manager.Accept(ctx, p.ID, r.Machine)
if err == db.ErrOptimisticLock {
return nil
} else if err != nil {
logger.WithError(err).
WithFields(
logrus.Fields{
"stage-id": p.ID,
"build-id": p.BuildID,
"repo-id": p.RepoID,
}).Warnln("runner: cannot ack stage")
return err
}
  • 4.3 watch 任务cancel信号, 如果接收到取消信号则关闭任务执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    go func() {
    logger.Debugln("runner: watch for cancel signal")
    done, _ := r.Manager.Watch(ctx, p.BuildID)
    if done {
    cancel()
    logger.Debugln("runner: received cancel signal")
    } else {
    logger.Debugln("runner: done listening for cancel signals")
    }
    }()

  • 4.4 r.Run(ctx, p.ID) 调度创建运行资源

    1
    return r.Run(ctx, p.ID)
  • 5、runner.Run执行逻辑

    1
    2
    3
    func (r *Runner) Run(ctx context.Context, id int64) error {
    ...
    }
  • 5.1 捕获panic

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    defer func() {
    // taking the paranoid approach to recover from
    // a panic that should absolutely never happen.
    if r := recover(); r != nil {
    logger.Errorf("runner: unexpected panic: %s", r)
    debug.PrintStack()
    }
    }()


  • 5.2、请求任务详情

1
2
3
4
5
6
m, err := r.Manager.Details(ctx, id)
if err != nil {
logger.WithError(err).Warnln("runner: cannot get stage details")
return err
}

  • 5.3、请求netrc

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    netrc, err := r.Manager.Netrc(ctx, m.Repo.ID)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("runner: cannot get netrc file")
    return r.handleError(ctx, m.Stage, err)
    }
    if netrc == nil {
    netrc = new(core.Netrc)
    }

  • netrc结构定义。 Netrc 包含自动登录过程使用的登录和初始化信息。

    1
    2
    3
    4
    5
    6
    Netrc struct {
    Machine string `json:"machine"`
    Login string `json:"login"`
    Password string `json:"password"`
    }

  • NetrcService 返回一个有效的 netrc 文件,该文件可用于验证和克隆私有存储库。如果不需要或启用身份验证,则返回 nil Netrc 文件和 nil 错误。

  • 5.4、任务状态判断,如果此时任务取消了,则会退出后续逻辑

    1
    2
    3
    4
    5
    6
    if m.Build.Status == core.StatusKilled || m.Build.Status == core.StatusSkipped {
    logger = logger.WithError(err)
    logger.Infoln("runner: cannot run a canceled build")
    return nil
    }

  • 5.5、构建环境变量参数

1
2
3
4
5
6
7
8
9
environ := combineEnviron(
agentEnviron(r),
buildEnviron(m.Build),
repoEnviron(m.Repo),
stageEnviron(m.Stage),
systemEnviron(m.System),
linkEnviron(m.Repo, m.Build, m.System),
m.Build.Params,
)
  • 5.6、pipeline yaml文件解析、验证。这块代码逻辑和trigger创建任务逻辑有些相似。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    y, err := converter.ConvertString(string(m.Config.Data), converter.Metadata{
    Filename: m.Repo.Config,
    URL: m.Repo.Link,
    Ref: m.Build.Ref,
    })

    if err != nil {
    return err
    }

    y, err = envsubst.Eval(y, func(name string) string {
    env := environ[name]
    if strings.Contains(env, "\n") {
    env = fmt.Sprintf("%q", env)
    }
    return env
    })
    if err != nil {
    return r.handleError(ctx, m.Stage, err)
    }

    manifest, err := yaml.ParseString(y)
    if err != nil {
    logger = logger.WithError(err)
    logger.Warnln("runner: cannot parse yaml")
    return r.handleError(ctx, m.Stage, err)
    }

  • 5.7、pipeline yaml解析stage,找到当前任务stage的yaml配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
var pipeline *yaml.Pipeline
for _, resource := range manifest.Resources {
v, ok := resource.(*yaml.Pipeline)
if !ok {
continue
}
if v.Name == m.Stage.Name {
pipeline = v
break
}
}
if pipeline == nil {
logger = logger.WithError(err)
logger.Errorln("runner: cannot find named pipeline")
return r.handleError(ctx, m.Stage,
errors.New("cannot find named pipeline"),
)
}

logger = logger.WithField("pipeline", pipeline.Name)

err = linter.Lint(pipeline, m.Repo.Trusted)
if err != nil {
logger = logger.WithError(err)
logger.Warnln("runner: yaml lint errors")
return r.handleError(ctx, m.Stage, err)
}

  • 5.8、配置docker registry认证参数
1
2
3
4
5
6
7
8
9
10
secretService := secret.Combine(
secret.Encrypted(),
secret.Static(m.Secrets),
r.Secrets,
)
registryService := registry.Combine(
registry.Static(m.Secrets),
r.Registry,
)

  • 5.9、配置docker启动配置。docker image、volume、workspace、label、docker资源配置额等

    1
    2
    3
    ....
    ir := comp.Compile(pipeline)

  • 5.10、构建stage steps

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    steps := map[string]*core.Step{}
    i := 0
    for _, s := range ir.Steps {
    if s.RunPolicy == engine.RunNever {
    continue
    }
    i++
    dst := &core.Step{
    Number: i,
    Name: s.Metadata.Name,
    StageID: m.Stage.ID,
    Status: core.StatusPending,
    ErrIgnore: s.IgnoreErr,
    }
    steps[dst.Name] = dst
    m.Stage.Steps = append(m.Stage.Steps, dst)
    }

  • 5.11、构建hook,代码比较长

    1
    2
    3
    4
    hooks := &runtime.Hook{
    ....
    }

  • 5.12、new runner: 基于上边组装的hook和docker engine spec参数创建runner

    1
    2
    3
    4
    5
    6
    runner := runtime.New(
    runtime.WithEngine(r.Engine),
    runtime.WithConfig(ir),
    runtime.WithHooks(hooks),
    )

  • 5.13 runner运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

m.Stage.Status = core.StatusRunning
m.Stage.Started = time.Now().Unix()
m.Stage.Machine = r.Machine
err = r.Manager.BeforeAll(ctx, m.Stage)
...
err = runner.Run(timeout)
...

m.Stage.Status = core.StatusPassing
m.Stage.Stopped = time.Now().Unix()
for _, step := range m.Stage.Steps {
if step.Status == core.StatusPending {
step.Status = core.StatusSkipped
}
if step.Status == core.StatusRunning {
step.Status = core.StatusPassing
step.Stopped = time.Now().Unix()
}
}

return r.Manager.AfterAll(ctx, m.Stage)
}

r.Manager.BeforeAll和 r.Manager.AfterAll通知服务端更新任务状态,最对应逻辑处理。
runner.Run(timeout) 真正调用docker api创建容器

  • 6.1、 runner.Run。 这里和5.0区别,5.0是运行执行任务id的任务。而这里是运行runner创建容器运行pipeline并等待完成

    1
    2
    3
    func (r *Runtime) Run(ctx context.Context) error {
    return r.Resume(ctx, 0)
    }
  • 7、(r *Runtime) Resume(ctx context.Context, start int) 代码逻辑

  • 7.1 、延迟执行,函数执行完后清理容器环境现场

    1
    2
    3
    4
    5
    6
    7
    defer func() {
    // note that we use a new context to destroy the
    // environment to ensure it is not in a canceled
    // stat