ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

DaemonSetController

2022-02-10 15:02:13  阅读:196  来源: 互联网

标签:return err ctx ds pod DaemonSetController dsc


cmd\kube-controller-manager\app\core.go
func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
	dsc, err := daemon.NewDaemonSetsController(
		controllerContext.InformerFactory.Apps().V1().DaemonSets(),
		controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.InformerFactory.Core().V1().Nodes(),
		controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"),
		flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
	}
	go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs)) --->pkg\controller\daemon\daemon_controller.go
	return nil, true, nil
}

pkg\controller\daemon\daemon_controller.go

func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
	}
}


func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
	for dsc.processNextWorkItem(ctx) {
	}
}


func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
	...
	err := dsc.syncHandler(ctx, dsKey.(string)) --->syncDaemonSet
}


func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
	// 获取目标namespace下的DaemonSet
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
	// 获取所有节点
	nodeList, err := dsc.nodeLister.List(labels.Everything())
	...
	// 如果已经创建或者删除的DaemonSet,那么不要处理
	dsKey, err := controller.KeyFunc(ds)
	// 如果存在正在被删除的DaemonSet,那么暂时结束此次操作
	if ds.DeletionTimestamp != nil {
		return nil
	}

	// 对比新旧DaemonSet
	cur, old, err := dsc.constructHistory(ctx, ds)
	hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
	if !dsc.expectations.SatisfiedExpectations(dsKey) {
		// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
		return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
	}
	// 管理DaemonSet
	err = dsc.manage(ctx, ds, nodeList, hash)

	// Process rolling updates if we're ready.
	if dsc.expectations.SatisfiedExpectations(dsKey) {
		switch ds.Spec.UpdateStrategy.Type {
		case apps.OnDeleteDaemonSetStrategyType:
		case apps.RollingUpdateDaemonSetStrategyType:
			err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
		}
		if err != nil {
			return err
		}
	}

	err = dsc.cleanupHistory(ctx, ds, old)
	if err != nil {
		return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
	}

	return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
}


**manage**
	// 找出要运行DaemonSet的具体pod对应节点
	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds)
	// 计算所有节点,对该运行pod的节点进行创建,不该运行的进行停止
	var nodesNeedingDaemonPods, podsToDelete []string
	for _, node := range nodeList {
		nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
			node, nodeToDaemonPods, ds, hash)
		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
		podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
	}

	podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
	// 同步节点信息
	if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
		return err
	}


**podsShouldBeOnNode**
	shouldRun, shouldContinueRunning := dsc.nodeShouldRunDaemonPod(node, ds)
	daemonPods, exists := nodeToDaemonPods[node.Name]

	switch {
	case shouldRun && !exists:
		// 如果需要并且不存在则创建
		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
	case shouldContinueRunning:
		// 如果创建失败则删除
		...
	case !shouldContinueRunning && exists:
		// 如果不应该运行但是存在则要删除他
		for _, pod := range daemonPods {
			if pod.DeletionTimestamp != nil {
				continue
			}
			podsToDelete = append(podsToDelete, pod.Name)
		}
	}
	return nodesNeedingDaemonPods, podsToDelete


// 确定pod是否该运行,以及是否该继续运行
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool) {
	pod := NewPod(ds, node.Name)

	// 如果DaemonSet指定了运行节点,则进行节点名对比,不符合则不用运行
	if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
		return false, false
	}
	
	// 如果节点有污点信息,则要计算pod是否可容忍该污点,不容忍则不用运行
	taints := node.Spec.Taints
	fitsNodeName, fitsNodeAffinity, fitsTaints := Predicates(pod, node, taints)
	if !fitsNodeName || !fitsNodeAffinity {
		return false, false
	}

	if !fitsTaints {
		// 可容忍污点则继续调度
		_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
			return t.Effect == v1.TaintEffectNoExecute
		})
		return false, !hasUntoleratedTaint
	}

	return true, true
}


syncNodes
	...
	// 批量创建pod
	createWait.Add(batchSize)
		for i := pos; i < pos+batchSize; i++ {
			go func(ix int) {
				defer createWait.Done()

				podTemplate := template.DeepCopy()
				podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
					podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
				err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
					ds, metav1.NewControllerRef(ds, controllerKind))
			}(i)
		}
		createWait.Wait()
	...
	// 删除pod
	deleteWait := sync.WaitGroup{}
	deleteWait.Add(deleteDiff)
	for i := 0; i < deleteDiff; i++ {
		go func(ix int) {
			defer deleteWait.Done()
			if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
				dsc.expectations.DeletionObserved(dsKey)
			}
		}(i)
	}
	deleteWait.Wait()

标签:return,err,ctx,ds,pod,DaemonSetController,dsc
来源: https://www.cnblogs.com/bfmq/p/15879096.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有