openshift底层是通过kubelet来管理pod,kubelet通过CNI插件来配置pod网络.openshift node节点在启动的时会在一个goroutine中启动kubelet, 由kubelet来负责pod的管理工作。
本文主要从源码的角度入手,简单分析在openshift环境下kubelet是如何通过调用openshift sdn插件来配置pod网络。
1podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
1// createPodSandbox creates a pod sandbox and returns (podSandBoxID, message, error). 2func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) { 3 podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt) 4 if err != nil { 5 message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) 6 glog.Error(message) 7 return "", message, err 8 } 910 // Create pod logs directory11 err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)12 if err != nil {13 message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)14 glog.Errorf(message)15 return "", message, err16 }1718 podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig)19 if err != nil {20 message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)21 glog.Error(message)22 return "", message, err23 }2425 return podSandBoxID, "", nil26}
该方法首先会调用generatePodSandboxConfig来生成pod sandbox配置文件,然后调用MkdirAll方法来创建pod的日志目录,最后调用RunPodSandbox来完成具体的pod创建工作。
RunPodSandbox方法位于pkg/kubelet/dockershim/docker_sandbox.go#L79, 内容如下:
1// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure 2// the sandbox is in ready state. 3// For docker, PodSandbox is implemented by a container holding the network 4// namespace for the pod. 5// Note: docker doesn't use LogDirectory (yet). 6func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { 7 config := r.GetConfig() 8 9 // Step 1: Pull the image for the sandbox.10 image := defaultSandboxImage11 podSandboxImage := ds.podSandboxImage12 if len(podSandboxImage) != 0 {13 image = podSandboxImage14 }1516 // NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.17 // see: // Only pull sandbox image when it's not present - v1.PullIfNotPresent.19 if err := ensureSandboxImageExists(ds.client, image); err != nil {20 return nil, err21 }2223 // Step 2: Create the sandbox container.24 createConfig, err := ds.makeSandboxDockerConfig(config, image)25 if err != nil {26 return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)27 }28 createResp, err := ds.client.CreateContainer(*createConfig)29 if err != nil {30 createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)31 }3233 if err != nil || createResp == nil {34 return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)35 }36 resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}3738 ds.setNetworkReady(createResp.ID, false)39 defer func(e *error) {40 // Set networking ready depending on the error return of41 // the parent function42 if *e == nil {43 ds.setNetworkReady(createResp.ID, true)44 }45 }(&err)4647 // Step 3: Create Sandbox Checkpoint.48 if err = ds.checkpointHandler.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {49 return nil, err50 }5152 // Step 4: Start the sandbox container.53 // Assume kubelet's garbage collector would remove the sandbox later, if54 // startContainer failed.55 err = ds.client.StartContainer(createResp.ID)56 if err != nil {57 return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)58 }5960 // Rewrite resolv.conf file generated by docker.61 // NOTE: cluster dns settings aren't passed anymore to docker api in all cases,62 // not only for pods with host network: the resolver conf will be overwritten63 // after sandbox creation to override docker's behaviour. This resolv.conf64 // file is shared by all containers of the same pod, and needs to be modified65 // only once per pod.66 if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {67 containerInfo, err := ds.client.InspectContainer(createResp.ID)68 if err != nil {69 return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)70 }7172 if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {73 return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)74 }75 }7677 // Do not invoke network plugins if in hostNetwork mode.78 if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {79 return resp, nil80 }8182 // Step 5: Setup networking for the sandbox.83 // All pod networking is setup by a CNI plugin discovered at startup time.84 // This plugin assigns the pod ip, sets up routes inside the sandbox,85 // creates interfaces etc. In theory, its jurisdiction ends with pod86 // sandbox networking, but it might insert iptables rules or open ports87 // on the host as well, to satisfy parts of the pod spec that aren't88 // recognized by the CNI standard yet.89 cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)90 err =, config.GetMetadata().Name, cID, config.Annotations)91 if err != nil {92 // TODO(random-liu): Do we need to teardown network here?93 if err := ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod); err != nil {94 glog.Warningf("Failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err)95 }96 }97 return resp, err98}
在上面代码的第19行,首先通过调用ensureSandboxImageExists方法来拉取pod infra容器的镜像,确保在infra容器创建时镜像已经在本地。该方法的定义位于pkg/kubelet/dockershim/helpers.go#L316,内容如下:
1func ensureSandboxImageExists(client libdocker.Interface, image string) error { 2 _, err := client.InspectImageByRef(image) 3 if err == nil { 4 return nil 5 } 6 if !libdocker.IsImageNotFoundError(err) { 7 return fmt.Errorf("failed to inspect sandbox image %q: %v", image, err) 8 } 910 repoToPull, _, _, err := parsers.ParseImageName(image)11 if err != nil {12 return err13 }1415 keyring := credentialprovider.NewDockerKeyring()16 creds, withCredentials := keyring.Lookup(repoToPull)17 if !withCredentials {18 glog.V(3).Infof("Pulling image %q without credentials", image)1920 err := client.PullImage(image, dockertypes.AuthConfig{}, dockertypes.ImagePullOptions{})21 if err != nil {22 return fmt.Errorf("failed pulling image %q: %v", image, err)23 }2425 return nil26 }2728 var pullErrs []error29 for _, currentCreds := range creds {30 authConfig := credentialprovider.LazyProvide(currentCreds)31 err := client.PullImage(image, authConfig, dockertypes.ImagePullOptions{})32 // If there was no error, return success33 if err == nil {34 return nil35 }3637 pullErrs = append(pullErrs, err)38 }3940 return utilerrors.NewAggregate(pullErrs)41}
该方法会首先判断镜像在不在本地,如果已经存在于本地则直接返回,如果不存在则调用docker client拉取镜像,拉取镜像时还会处理认证相关的问题。
1func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) { 2 ctx, cancel := d.getTimeoutContext() 3 defer cancel() 4 // we provide an explicit default shm size as to not depend on docker daemon. 5 // TODO: evaluate exposing this as a knob in the API 6 if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 { 7 opts.HostConfig.ShmSize = defaultShmSize 8 } 9 createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)10 if ctxErr := contextError(ctx); ctxErr != nil {11 return nil, ctxErr12 }13 if err != nil {14 return nil, err15 }16 return &createResp, nil17}
该方法在第9行实际上是调用docker client来创建容器,最终也就是调用docker的remote api来创建的容器。
1func (d *kubeDockerClient) StartContainer(id string) error {2 ctx, cancel := d.getTimeoutContext()3 defer cancel()4 err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})5 if ctxErr := contextError(ctx); ctxErr != nil {6 return ctxErr7 }8 return err9}
在RunPodSandbox方法的第90行,调用了network plugin的SetUpPod方法来配置pod网络。该方法位于pkg/kubelet/network/plugins.go#L406,内容如下:
1func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID, annotations map[string]string) error { 2 defer recordOperation("set_up_pod", time.Now()) 3 fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) 4 pm.podLock(fullPodName).Lock() 5 defer pm.podUnlock(fullPodName) 6 7 glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName) 8 if err := pm.plugin.SetUpPod(podNamespace, podName, id, annotations); err != nil { 9 return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err)10 }1112 return nil13}
该方法主要逻辑是第8行,调用plugin的SetUpPod方法,这里plugin是一个interface, 具体使用哪个plugin是由kubelet的启动参数--network-plugin决定的,openshift在启动kubelet时传递的参数是--netowr-plugin=cni,也就是调用cni插件的SetupPod方法。该方法的定义位于:pkg/kubelet/network/cni/cni.go#L208,内容如下:
1func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error { 2 if err := plugin.checkInitialized(); err != nil { 3 return err 4 } 5 netnsPath, err := 6 if err != nil { 7 return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) 8 } 910 // Windows doesn't have loNetwork. It comes only with Linux11 if plugin.loNetwork != nil {12 if _, err = plugin.addToNetwork(plugin.loNetwork, name, namespace, id, netnsPath); err != nil {13 glog.Errorf("Error while adding to cni lo network: %s", err)14 return err15 }16 }1718 _, err = plugin.addToNetwork(plugin.getDefaultNetwork(), name, namespace, id, netnsPath)19 if err != nil {20 glog.Errorf("Error while adding to cni network: %s", err)21 return err22 }2324 return err25}
该方法先调用GetNetNS找到pod所在的netnamespace的路径,该值在后续配置网络时会用到,然后如果系统是linux的话,会调用addToNetwork来配置loopback设备的网络,最后调用addToNetwork来配置pod eth0接口的网络。这里需要关注一下第18行的getDefaultNetwork这个方法,该方法的源码位于pkg/kubelet/network/cni/cni.go#L177, 内容如下:
1func (plugin *cniNetworkPlugin) getDefaultNetwork() *cniNetwork {2 plugin.RLock()3 defer plugin.RUnlock()4 return plugin.defaultNetwork5}
该方法返回plugin.defaultNetwork,该值最终是调用getDefaultCNINetwork方法获取,源码位于pkg/kubelet/network/cni/cni.go#L95, 内容如下:
1func getDefaultCNINetwork(pluginDir, binDir, vendorCNIDirPrefix string) (*cniNetwork, error) { 2 if pluginDir == "" { 3 pluginDir = DefaultNetDir 4 } 5 files, err := libcni.ConfFiles(pluginDir, []string{".conf", ".conflist", ".json"}) 6 switch { 7 case err != nil: 8 return nil, err 9 case len(files) == 0:10 return nil, fmt.Errorf("No networks found in %s", pluginDir)11 }1213 sort.Strings(files)14 for _, confFile := range files {15 var confList *libcni.NetworkConfigList16 if strings.HasSuffix(confFile, ".conflist") {17 confList, err = libcni.ConfListFromFile(confFile)18 if err != nil {19 glog.Warningf("Error loading CNI config list file %s: %v", confFile, err)20 continue21 }22 } else {23 conf, err := libcni.ConfFromFile(confFile)24 if err != nil {25 glog.Warningf("Error loading CNI config file %s: %v", confFile, err)26 continue27 }28 // Ensure the config has a "type" so we know what plugin to run.29 // Also catches the case where somebody put a conflist into a conf file.30 if conf.Network.Type == "" {31 glog.Warningf("Error loading CNI config file %s: no 'type'; perhaps this is a .conflist?", confFile)32 continue33 }3435 confList, err = libcni.ConfListFromConf(conf)36 if err != nil {37 glog.Warningf("Error converting CNI config file %s to list: %v", confFile, err)38 continue39 }40 }41 if len(confList.Plugins) == 0 {42 glog.Warningf("CNI config list %s has no networks, skipping", confFile)43 continue44 }45 confType := confList.Plugins[0].Network.Type4647 // Search for vendor-specific plugins as well as default plugins in the CNI codebase.48 vendorDir := vendorCNIDir(vendorCNIDirPrefix, confType)49 cninet := &libcni.CNIConfig{50 Path: []string{vendorDir, binDir},51 }52 network := &cniNetwork{name: confList.Name, NetworkConfig: confList, CNIConfig: cninet}53 return network, nil54 }55 return nil, fmt.Errorf("No valid networks found in %s", pluginDir)56}
1type cniNetwork struct {2 name string3 NetworkConfig *libcni.NetworkConfigList4 CNIConfig libcni.CNI5}
openshift node节点在启动时,会在/etc/cni/net.d目录下写入配置文件80-openshift-network.conf,内容如下:
1{2 “cniVersion”: “0.2.0”,3 “name”: “openshift-sdn”,4 “type”: “openshift-sdn”5}
所以上面的getDefaultCNINetwork的执行实际上是读取到了openshift sdn插件的相关配置。
接下来回到addToNetwork方法,该方法的定义位pkg/kubelet/network/cni/cni.go#L248, 内容如下:
1func (plugin *cniNetworkPlugin) addToNetwork(network *cniNetwork, podName string, podNamespace string, podSandboxID kubecontainer.ContainerID, podNetnsPath string) (cnitypes.Result, error) { 2 rt, err := plugin.buildCNIRuntimeConf(podName, podNamespace, podSandboxID, podNetnsPath) 3 if err != nil { 4 glog.Errorf("Error adding network when building cni runtime conf: %v", err) 5 return nil, err 6 } 7 8 netConf, cniNet := network.NetworkConfig, network.CNIConfig 9 glog.V(4).Infof("About to add CNI network %v (type=%v)", netConf.Name, netConf.Plugins[0].Network.Type)10 res, err := cniNet.AddNetworkList(netConf, rt)11 if err != nil {12 glog.Errorf("Error adding network: %v", err)13 return nil, err14 }1516 return res, nil17}
1type RuntimeConf struct { 2 ContainerID string 3 NetNS string 4 IfName string 5 Args [][2]string 6 // A dictionary of capability-specific data passed by the runtime 7 // to plugins as top-level keys in the 'runtimeConfig' dictionary 8 // of the plugin's stdin data. libcni will ensure that only keys 9 // in this map which match the capabilities of the plugin are passed10 // to the plugin11 CapabilityArgs map[string]interface{}12}
1func (c *CNIConfig) AddNetworkList(list *NetworkConfigList, rt *RuntimeConf) (types.Result, error) { 2 var prevResult types.Result 3 for _, net := range list.Plugins { 4 pluginPath, err := invoke.FindInPath(net.Network.Type, c.Path) 5 if err != nil { 6 return nil, err 7 } 8 9 newConf, err := buildOneConfig(list, net, prevResult, rt)10 if err != nil {11 return nil, err12 }1314 prevResult, err = invoke.ExecPluginWithResult(pluginPath, newConf.Bytes, c.args("ADD", rt))15 if err != nil {16 return nil, err17 }18 }1920 return prevResult, nil21}
该方法首先调用FindInPath这个方法来找到plugin的路径,FindInPath会根据CNI配置的Type在/opt/cni/bin下面找到同名的插件,然后返回插件的绝对路径。我们以openshift sdn插件的配置为例,配置的内容如下:
1{2 “cniVersion”: “0.2.0”,3 “name”: “openshift-sdn”,4 “type”: “openshift-sdn”5}
doCNI: 该方法用于向CNIServer发送请求,openshit node节点在启动时会启动一个cniServer, 用于跟cni plugin进行通信,通信的流程下面会分析。
CmdAdd: 用于执行ADD请求,在设置pod网络时会被调用,比如上面在调用插件时传入了ADD参数就是调用这个方法。
CmdDel: 用于执行DEL请求,在删除pod网络时会被调用。
1func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) error { 2 req := newCNIRequest(args) 3 config, err := cniserver.ReadConfig(cniserver.CNIServerConfigFilePath) 4 if err != nil { 5 return err 6 } 7 8 var hostVeth, contVeth net.Interface 9 err = ns.WithNetNSPath(args.Netns, func(hostNS ns.NetNS) error { 10 hostVeth, contVeth, err = ip.SetupVeth(args.IfName, int(config.MTU), hostNS) 11 if err != nil { 12 return fmt.Errorf("failed to create container veth: %v", err) 13 } 14 return nil 15 }) 16 if err != nil { 17 return err 18 } 19 result, err := p.doCNIServerAdd(req, hostVeth.Name) 20 if err != nil { 21 return err 22 } 23 24 // current.NewResultFromResult and ipam.ConfigureIface both think that 25 // a route with no gateway specified means to pass the default gateway 26 // as the next hop to ip.AddRoute, but that's not what we want; we want 27 // to pass nil as the next hop. So we need to clear the default gateway. 28 result020, err := types020.GetResult(result) 29 if err != nil { 30 return fmt.Errorf("failed to convert IPAM result: %v", err) 31 } 32 defaultGW := result020.IP4.Gateway 33 result020.IP4.Gateway = nil 34 35 result030, err := current.NewResultFromResult(result020) 36 if err != nil || len(result030.IPs) != 1 || result030.IPs[0].Version != "4" { 37 return fmt.Errorf("failed to convert IPAM result: %v", err) 38 } 39 40 // Add a sandbox interface record which ConfigureInterface expects. 41 // The only interface we report is the pod interface. 42 result030.Interfaces = []*current.Interface{ 43 { 44 Name: args.IfName, 45 Mac: contVeth.HardwareAddr.String(), 46 Sandbox: args.Netns, 47 }, 48 } 49 result030.IPs[0].Interface = current.Int(0) 50 51 err = ns.WithNetNSPath(args.Netns, func(hostNS ns.NetNS) error { 52 // Set up eth0 53 if err := ip.SetHWAddrByIP(args.IfName, result030.IPs[0].Address.IP, nil); err != nil { 54 return fmt.Errorf("failed to set pod interface MAC address: %v", err) 55 } 56 if err := ipam.ConfigureIface(args.IfName, result030); err != nil { 57 return fmt.Errorf("failed to configure container IPAM: %v", err) 58 } 59 60 // Set up lo 61 link, err := netlink.LinkByName("lo") 62 if err == nil { 63 err = netlink.LinkSetUp(link) 64 } 65 if err != nil { 66 return fmt.Errorf("failed to configure container loopback: %v", err) 67 } 68 69 // Set up macvlan0 (if it exists) 70 link, err = netlink.LinkByName("macvlan0") 71 if err == nil { 72 err = netlink.LinkSetUp(link) 73 if err != nil { 74 return fmt.Errorf("failed to enable macvlan device: %v", err) 75 } 76 77 // A macvlan can't reach its parent interface's IP, so we need to 78 // add a route to that via the SDN 79 var addrs []netlink.Addr 80 err = hostNS.Do(func(ns.NetNS) error { 81 parent, err := netlink.LinkByIndex(link.Attrs().ParentIndex) 82 if err != nil { 83 return err 84 } 85 addrs, err = netlink.AddrList(parent, netlink.FAMILY_V4) 86 return err 87 }) 88 if err != nil { 89 return fmt.Errorf("failed to configure macvlan device: %v", err) 90 } 91 for _, addr := range addrs { 92 route := &netlink.Route{ 93 Dst: &net.IPNet{ 94 IP: addr.IP, 95 Mask: net.CIDRMask(32, 32), 96 }, 97 Gw: defaultGW, 98 } 99 if err := netlink.RouteAdd(route); err != nil {100 return fmt.Errorf("failed to add route to node IP: %v", err)101 }102 }103104 // Add a route to service network via SDN105 _, serviceIPNet, err := net.ParseCIDR(config.ServiceNetworkCIDR)106 if err != nil {107 return fmt.Errorf("failed to parse ServiceNetworkCIDR: %v", err)108 }109 route := &netlink.Route{110 Dst: serviceIPNet,111 Gw: defaultGW,112 }113 if err := netlink.RouteAdd(route); err != nil {114 return fmt.Errorf("failed to add route to service network: %v", err)115 }116 }117118 return nil119 })120 if err != nil {121 return err122 }123124 return result.Print()125}