更多关于kubernetes的深入文章,请看我或者的博客主页。
虽然在Kubernetes v1.2中,Kubernetes推出了Deployments特性,Deployment通过创建ReplicaSet来管理Pod,ReplicaSet被视为下一代ReplicationController。但实际上ReplicaSet和ReplicationController区别仅仅是其Selector支持的类型不同。
- ReplicaSet既支持equality-based selector requirements,也支持set-based selector requirements。
- ReplicationController只支持equality-based selector requirements。
当然Deployments的还是很有用的,可以支持用户的滚动部署的需求。对Deployments的分析我会在后面单独搞一篇博文。在本文,我们只看ReplicationController。
需要澄清一点,这里我们说的ReplicationController是RC控制器,而不是RC Resource。
本文基于kubernetes v1.5的代码进行分析。
ReplicationManager
ReplicationManager就是ReplicationController控制器对象,方便在代码中和ReplicationController Resource API Object进行区分。下面代码是ReplicationManager的结构定义。
pkg/controller/replication/replication_controller.go:75// ReplicationManager is responsible for synchronizing ReplicationController objects stored in the system with actual running pods.type ReplicationManager struct { kubeClient clientset.Interface podControl controller.PodControlInterface // internalPodInformer is used to hold a personal informer. If we're using // a normal shared informer, then the informer will be started for us. If // we have a personal informer, we must start it ourselves. If you start // the controller using NewReplicationManager(passing SharedInformer), this // will be null internalPodInformer cache.SharedIndexInformer // An rc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error // A TTLCache of pod creates/deletes each rc expects to see. expectations *controller.UIDTrackingControllerExpectations // A store of replication controllers, populated by the rcController rcStore cache.StoreToReplicationControllerLister // Watches changes to all replication controllers rcController *cache.Controller // A store of pods, populated by the podController podStore cache.StoreToPodLister // Watches changes to all pods podController cache.ControllerInterface // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool lookupCache *controller.MatchingCache // Controllers that need to be synced queue workqueue.RateLimitingInterface // garbageCollectorEnabled denotes if the garbage collector is enabled. RC // manager behaves differently if GC is enabled. garbageCollectorEnabled bool}
重点对下面个几个对象介绍说明:
- podControl: 提供Create/Delete Pod的操作接口。
- burstReplicas: 每次批量Create/Delete Pods时允许并发的最大数量。
- syncHandler: 真正执行Replica Sync的函数。
- expectation: 维护的期望状态下的Pod的Uid Cache,并且提供了修正该Cache的接口。
- rcStore: ReplicationController Resource对象的Indexer,数据由rcController提供和维护。
- rcController: 用来watch 所有 ReplicationController Resource,watch到的change更新到rcStore中。
- podStore: Pod的Indexer,数据由podController提供和维护。
- podController: 用来watch所有Pod Resource,watch到的change更新到podStore中。
- queue: 用来存放待sync的RC,是一个RateLimit类型的queue。
- lookupCache: 提供Pod和RC匹配信息的cache,以提高查询效率。
ReplicationController在何处启动的
看过我我的博文: 的可能有印象,里面也提到了controller manager是如何启动ResourceQuotaController的,ReplicationController也是一样的。在kube-controller-manager调用newControllerInitializers进行控制器初始化的时候,将startReplicationController注册进去了,用来启动ReplicationController控制器。
cmd/kube-controller-manager/app/controllermanager.go:224func newControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefuleset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["certificatesigningrequests"] = startCSRController return controllers}
代码继续跟到startReplicationController,很简单,启动一个goroutine,调用replicationcontroller.NewReplicationManager创建一个ReplicationManager并执行其中Run方法开始工作。
cmd/kube-controller-manager/app/core.go:55func startReplicationController(ctx ControllerContext) (bool, error) { go replicationcontroller.NewReplicationManager( ctx.InformerFactory.Pods().Informer(), ctx.ClientBuilder.ClientOrDie("replication-controller"), ResyncPeriod(&ctx.Options), replicationcontroller.BurstReplicas, int(ctx.Options.LookupCacheSizeForRC), ctx.Options.EnableGarbageCollector, ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) return true, nil}
创建ReplicationManager
上面分析到,controller-manager通过NewReplicationManager创建一个ReplicationManager对象,其实就是ReplicationController控制器。
pkg/controller/replication/replication_controller.go:122// NewReplicationManager creates a replication managerfunc NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) return newReplicationManager( eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}), podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)}pkg/controller/replication/replication_controller.go:132// newReplicationManager configures a replication manager with the specified event recorderfunc newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } rm := &ReplicationManager{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventRecorder, }, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"), garbageCollectorEnabled: garbageCollectorEnabled, } rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options) }, }, &v1.ReplicationController{}, // TODO: Can we have much longer period here? FullControllerResyncPeriod, cache.ResourceEventHandlerFuncs{ AddFunc: rm.enqueueController, UpdateFunc: rm.updateRC, // This will enter the sync loop and no-op, because the controller has been deleted from the store. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the controller. DeleteFunc: rm.enqueueController, }, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rm.addPod, // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill // the most frequent pod update is status, and the associated rc will only list from local storage, so // it should be ok. UpdateFunc: rm.updatePod, DeleteFunc: rm.deletePod, }) rm.podStore.Indexer = podInformer.GetIndexer() rm.podController = podInformer.GetController() rm.syncHandler = rm.syncReplicationController rm.podStoreSynced = rm.podController.HasSynced rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm}
newReplicationManager中主要配置ReplicationManager,比如:
- 通过workqueue.NewNamedRateLimitingQueue配置queue。
- 通过controller.NewUIDTrackingControllerExpectations配置expectations。
- 配置rcStore, podStore, rcController, podController。
- 配置syncHandler为rm.syncReplicationController,这个很重要,所以我单独列出来说。在后面会讲到,syncReplicationController就是做核心工作的的方法,可以说Replica的自动维护都是由它来完成的。
执行ReplicationManger.Run开始工作
ReplicationManager创建好了,接下来得干活啦。Run方法就是干活的起步点,开始进行watching and syncing。
pkg/controller/replication/replication_controller.go:217// Run begins watching and syncing.func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting RC Manager") go rm.rcController.Run(stopCh) go rm.podController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(rm.worker, time.Second, stopCh) } if rm.internalPodInformer != nil { go rm.internalPodInformer.Run(stopCh) } <-stopCh glog.Infof("Shutting down RC Manager") rm.queue.ShutDown()}
- watching
go rm.rcController.Run(stopCh)
负责watch all rc。go rm.podController.Run(stopCh)
负责watch all pod。
- syncing
- 启动workers数量的goroutine。
- 每个goroutine都不断循环执行rm.worker,每个循环之间停留1s。而rm.worker就是负责从queue中获取rc并调用syncHandler进行同步。
- 每个goroutine直到收到stopCh信号才结束。
下面是rcController和podController的Run方法实现,功能就是完成rc / pod的watch。
pkg/client/cache/controller.go:84// Run begins processing items, and will continue until a value is sent down stopCh.// It's an error to call Run more than once.// Run blocks; call via go.func (c *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() r.RunUntil(stopCh) wait.Until(c.processLoop, time.Second, stopCh)}
sync的关键实现,就在ReplicationManager的worker方法中,代码如下。
pkg/controller/replication/replication_controller.go:488// worker runs a worker thread that just dequeues items, processes them, and marks them done.// It enforces that the syncHandler is never invoked concurrently with the same key.func (rm *ReplicationManager) worker() { workFunc := func() bool { key, quit := rm.queue.Get() if quit { return true } defer rm.queue.Done(key) err := rm.syncHandler(key.(string)) if err == nil { rm.queue.Forget(key) return false } rm.queue.AddRateLimited(key) utilruntime.HandleError(err) return false } for { if quit := workFunc(); quit { glog.Infof("replication controller worker shutting down") return } }}
worker中的主要逻辑为:
- 从rm的RateLimited Queue中获取一个rc的key。
- 调用syncHandler Interface,对该rc进行sync。
在newReplicationManager时,通过rm.syncHandler = rm.syncReplicationController
注册syncHandler为syncReplicationController了。因此sync rc的逻辑就在syncReplicationController中了。
pkg/controller/replication/replication_controller.go:639// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked concurrently with the same key.func (rm *ReplicationManager) syncReplicationController(key string) error { trace := util.NewTrace("syncReplicationController: " + key) defer trace.LogIfLong(250 * time.Millisecond) startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime)) }() if !rm.podStoreSynced() { // Sleep so we give the pod reflector goroutine a chance to run. time.Sleep(PodStoreSyncedPollPeriod) glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key) rm.queue.Add(key) return nil } obj, exists, err := rm.rcStore.Indexer.GetByKey(key) if !exists { glog.Infof("Replication Controller has been deleted %v", key) rm.expectations.DeleteExpectations(key) return nil } if err != nil { return err } rc := *obj.(*v1.ReplicationController) trace.Step("ReplicationController restored") rcNeedsSync := rm.expectations.SatisfiedExpectations(key) trace.Step("Expectations restored") // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. // TODO: Do the List and Filter in a single pass, or use an index. var filteredPods []*v1.Pod if rm.garbageCollectorEnabled { // list all pods to include the pods that don't match the rc's selector // anymore but has the stale controller ref. pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods) // Adopt pods only if this replication controller is not going to be deleted. if rc.DeletionTimestamp == nil { for _, pod := range matchesNeedsController { err := cm.AdoptPod(pod) // continue to next pod if adoption fails. if err != nil { // If the pod no longer exists, don't even log the error. if !errors.IsNotFound(err) { utilruntime.HandleError(err) } } else { matchesAndControlled = append(matchesAndControlled, pod) } } } filteredPods = matchesAndControlled // remove the controllerRef for the pods that no longer have matching labels var errlist []error for _, pod := range controlledDoesNotMatch { err := cm.ReleasePod(pod) if err != nil { errlist = append(errlist, err) } } if len(errlist) != 0 { aggregate := utilerrors.NewAggregate(errlist) // push the RC into work queue again. We need to try to free the // pods again otherwise they will stuck with the stale // controllerRef. rm.queue.Add(key) return aggregate } } else { pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated()) if err != nil { glog.Errorf("Error getting pods for rc %q: %v", key, err) rm.queue.Add(key) return err } filteredPods = controller.FilterActivePods(pods) } var manageReplicasErr error if rcNeedsSync && rc.DeletionTimestamp == nil { manageReplicasErr = rm.manageReplicas(filteredPods, &rc) } trace.Step("manageReplicas done") newStatus := calculateStatus(rc, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil { // Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop return err } return manageReplicasErr}
syncReplicationController的主要逻辑为:
- 如果podStore还没有被同步过一次,则将该rc的key重新加入到queue中,以等待podStore同步,流程结束,否则继续后面的流程。
- 根据该rc的key值,从rcStore中获取对应的rc object,如果不存在该rc object,则说明该rc已经被删除了,然后根据key从epectations中删除该rc并返回,流程结束。如果存在该rc object,则继续后面的流程。
- 检测expectations中的add和del以及距离上一个时间戳是否超时5min,来判断该rc是否需要sync。
- 如果启动了GC,则获取podStore中整个namespace下的pods,然后将matchesAndControlled和matchesNeedsController的pods作为过滤后待同步的filteredPods。如果没有启动GC,则直接获取podStore中该namespace下匹配rc.Spec.Selector的Active状态的pods作为过滤后待同步的filteredPods。(关于matchesAndControlled和matchesNeedsController的理解,请参考pkg/controller/controller_ref_manager.go:57中定义的PodControllerRefManager.Classify函数)
- 如果第3步中检测到该rc需要sync,并且DeletionTimestamp这个时间戳为nil,则调用manageReplicas方法,使得该rc管理的active状态的pods数量和期望值一样。
- 执行完manageReplicas后,需要马上重新计算一下rc的status,更新status中的Conditions,Replicas,FullyLabeledReplicas,ReadyReplicas,AvailableReplicas信息。
- 通过updateReplicationControllerStatus方法调用kube-api-server的接口更新该rc的status为上一步重新计算后的新status,流程结束。
上面描述的syncReplicationController流程中,一个很关键的步骤是step 5中调用的manageReplicas方法,它负责rc对应replicas的修复工作(add or delete)。
pkg/controller/replication/replication_controller.go:516// manageReplicas checks and updates replicas for the given replication controller.// Does NOT modify.func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error { diff := len(filteredPods) - int(*(rc.Spec.Replicas)) rcKey, err := controller.KeyFunc(rc) if err != nil { return err } if diff == 0 { return nil } if diff < 0 { diff *= -1 if diff > rm.burstReplicas { diff = rm.burstReplicas } // TODO: Track UIDs of creates just like deletes. The problem currently // is we'd need to wait on the result of a create to record the pod's // UID, which would require locking *across* the create, which will turn // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. errCh := make(chan error, diff) rm.expectations.ExpectCreations(rcKey, diff) var wg sync.WaitGroup wg.Add(diff) glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) for i := 0; i < diff; i++ { go func() { defer wg.Done() var err error if rm.garbageCollectorEnabled { var trueVar = true controllerRef := &metav1.OwnerReference{ APIVersion: getRCKind().GroupVersion().String(), Kind: getRCKind().Kind, Name: rc.Name, UID: rc.UID, Controller: &trueVar, } err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) } else { err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc) } if err != nil { // Decrement the expected number of creates because the informer won't observe this pod glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) rm.expectations.CreationObserved(rcKey) errCh <- err utilruntime.HandleError(err) } }() } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } return nil } if diff > rm.burstReplicas { diff = rm.burstReplicas } glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff) // No need to sort pods if we are about to delete all of them if *(rc.Spec.Replicas) != 0 { // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(filteredPods)) } // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either // when we see it as an update of the deletion timestamp, or as a delete. // Note that if the labels on a pod/rc change in a way that the pod gets // orphaned, the rs will only wake up after the expectations have // expired even if other pods are deleted. deletedPodKeys := []string{} for i := 0; i < diff; i++ { deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) } // We use pod namespace/name as a UID to wait for deletions, so if the // labels on a pod/rc change in a way that the pod gets orphaned, the // rc will only wake up after the expectation has expired. errCh := make(chan error, diff) rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) var wg sync.WaitGroup wg.Add(diff) for i := 0; i < diff; i++ { go func(ix int) { defer wg.Done() if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(filteredPods[ix]) glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) rm.expectations.DeletionObserved(rcKey, podKey) errCh <- err utilruntime.HandleError(err) } }(i) } wg.Wait() select { case err := <-errCh: // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. if err != nil { return err } default: } return nil}
上面manageReplicas代码的主要逻辑为:
- 首先计算filteredPods中Pods数量和rc.Spec.Replicas中定义的期望数量的差值diff。
- 如果差值diff为0,表示当前状态和期望状态一样,直接返回,流程结束。
- 如果差值diff为负数,表示当前Active状态的Pods数量不足,则启动下面流程:
- 比较
|diff|
和burstReplicas的值,以保证这次最多只创建burstReplicas数量的pods。 - 调用expectations.ExpectCreations接口设置expectations中的add大小为
|diff|
的值,表示要新创建|diff|
数量的pods以达到期望状态。 - sync.WaitGroup启动
|diff|
数量的goroutine协程,每个goroutine分别负责调用podControl.CreatePods接口创建一个该namespace.rc管理的对应spec Template的pod。 - 待所有goroutine都执行完毕后,如果其中一个或者多个pod创建失败,则返回err,否则返回nil,流程结束。
- 比较
- 如果差值diff为正数,表示当前Active状态的Pods数量超过了期望值,则启动下面流程:
- 比较
|diff|
和burstReplicas的值,以保证这次最多只删除burstReplicas数量的pods。 - 对filteredPods中的pods进行排序,排序目的是:not-ready < ready, unscheduled < scheduled, and pending < running,让stages越早的pods优先被delete。
- 排序完之后,挑选前面
|diff|
个pods作为待delete的Pods。 - 调用expectations.ExpectDeletions接口设置expectations中的del大小为
|diff|
的值,表示要新删除|diff|
数量的pods以达到期望状态。 - sync.WaitGroup启动
|diff|
数量的goroutine协程,每个goroutine分别负责调用podControl.DeletePod接口删除待delete Pods中的一个Pod。 - 待所有goroutine都执行完毕后,如果其中一个或者多个pod删除失败,则返回err,否则返回nil,流程结束。
- 比较
至此,我认为关键的代码都已经分析完了,calculateStatus
, updateReplicationControllerStatus
方法比较简单,有兴趣的自己去瞄瞄。
总结
-
ReplicationManager是ReplicationController控制器的代码实现,以区分ReplicationController Resource Object。
-
在kube-controller-manager调用newControllerInitializers进行控制器初始化的时候,将startReplicationController注册进去了,用来启动ReplicationController控制器。
-
newReplicationManager中主要配置ReplicationManager时,进行了如下关键配置:
- 通过workqueue.NewNamedRateLimitingQueue配置queue。
- 通过controller.NewUIDTrackingControllerExpectations配置expectations。
- 配置rcStore, podStore, rcController, podController。
- 配置syncHandler为rm.syncReplicationController,syncReplicationController就是做核心工作的的方法,可以说Replica的自动维护都是由它来完成的。
-
Run方法就是干活的起步点,开始进行watching and syncing:
- watching
go rm.rcController.Run(stopCh)
负责watch all rc。go rm.podController.Run(stopCh)
负责watch all pod。
- syncing
- 启动workers数量的goroutine。
- 每个goroutine都不断循环执行rm.worker,每个循环之间停留1s。而rm.worker就是负责从queue中获取rc并调用syncHandler进行同步。
- 每个goroutine直到收到stopCh信号才结束。
- watching
-
syncReplicationController流程中,核心步骤是调用的manageReplicas方法,manageReplicas负责rc对应replicas的修复工作(add or delete)。
更多关于kubernetes的深入文章,请看我或者的博客主页。