本文尝试通过阅读源码的途径,了解elasticsearch
节点启动的大体流程。
1. 读取配置创建运行环境
运行环境,这里指的是Environment
对象,这个对象封装了Settings
对象(es配置),data
路径,plugins
路径,modules
路径,bin
路径,libs
路径,log
路径等。
相关源码:
/** Create an {@link Environment} for the command to use. Overrideable for tests. */
protected Environment createEnv(final Terminal terminal, final Map<String, String> settings) throws UserException {
final String esPathConf = System.getProperty("es.path.conf");
if (esPathConf == null) {
throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set");
}
return InternalSettingsPreparer.prepareEnvironment(Settings.EMPTY, terminal, settings, getConfigPath(esPathConf));
}
可以看到这里读取了es.path.conf
的配置,即es的各种配置文件所在路径。
个人认为这里没有必要读取es.path.conf
的配置,这个配置实际上可以不要的,因为已经有es.home
的配置了,并且Environment
类的构造函数里有这样的代码:
Environment(final Settings settings, final Path configPath, final Path tmpPath)
...
if (configPath != null) {
configFile = configPath.normalize();
} else {
configFile = homeFile.resolve("config");
}
...
}
默认的配置文件位置就是home
路径下的config
里面。
elasticsearch.yml
配置文件的加载在下面这个方法里面:
public static Environment prepareEnvironment(Settings input, Terminal terminal, Map<String, String> properties, Path configPath) {
...
output = Settings.builder(); // start with a fresh output
Path path = environment.configFile().resolve("elasticsearch.yml");
if (Files.exists(path)) {
try {
output.loadFromPath(path);
} catch (IOException e) {
throw new SettingsException("Failed to load settings from " + path.toString(), e);
}
}
...
return new Environment(output.build(), configPath);
}
加载出来的配置会封装成Settings
对象设置到Environment
里面。
2. 初始化节点
这块的代码都在Node
类的下面这个构造函数里面
protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
...
logger.info("initializing ...");
...
logger.info("initialized");
...
}
把一些细节省略之后,能很明显看到两条我们启动es的时候经常看到的日志。
下面去看里面一些关键性的细节,先整体看下这个构造函数:
protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;
{
// use temp logger just to say we are starting. we can't use it later on because the node name might not be set
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
logger.info("initializing ...");
}
try {
...
} catch (IOException ex) {
throw new ElasticsearchException("failed to bind service", ex);
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
}
具体细节在这个try
里面,里面会打开很多资源,这些资源都会被添加到一个List
里面,如果初始化失败,在finally
里面会将这些资源关闭。
然后进入try
里面去看具体初始化节点的流程。
配置节点
originalSettings = environment.settings();
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
// create the node environment as soon as possible, to recover the node id and enable logging
try {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
resourcesToClose.add(nodeEnvironment);
} catch (IOException ex) {
throw new IllegalStateException("Failed to create node environment", ex);
}
读取Environment
对象里面的配置,创建节点自己的配置NodeEnvironment
,里面的配置包括但不限于以下这些信息:
-
node.max_local_storage_nodes
,本地最大存储节点数量,默认1,即本地最多只能同时启动一个能存储数据的节点; -
node.id.seed
,生成节点id的随机数种子,默认是0; - 节点数据存放路径nodePath,默认是{esHomePath}/data/nodes/0,{esHomePath}/data/nodes/1等;
-
indice
数据存放路径,默认是{nodePath}/indices;
同时这里面随机生成了节点id,读取并打印出了磁盘、jvm信息,这里的heap size
是默认值1/4
物理内存:
[2018-10-19T17:52:35,480][INFO ][o.e.e.NodeEnvironment ] [0aIXj0y] using [1] data paths, mounts [[(F:)]], net usable_space [215gb], net total_space [231gb], types [NTFS]
[2018-10-19T17:53:12,209][INFO ][o.e.e.NodeEnvironment ] [0aIXj0y] heap size [7.9gb], compressed ordinary object pointers [true]
生成节点名称
final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
final String nodeId = nodeEnvironment.nodeId();
tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
检查node.name
是否有配置,如果用户没有配置,则取nodeId
前7位作为节点名称。
加载plugins和modules
this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
在PluginService
的构造函数里加载了所有的plugins
和modules
,过程中会打印出如下日志:
[2018-10-22T15:42:43,978][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [aggs-matrix-stats]
[2018-10-22T15:42:43,980][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [analysis-common]
[2018-10-22T15:42:43,981][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [ingest-common]
[2018-10-22T15:42:43,982][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [lang-expression]
[2018-10-22T15:42:43,983][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [lang-mustache]
[2018-10-22T15:42:43,983][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [lang-painless]
[2018-10-22T15:42:43,984][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [mapper-extras]
[2018-10-22T15:42:43,985][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [parent-join]
[2018-10-22T15:42:43,986][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [percolator]
[2018-10-22T15:42:43,987][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [rank-eval]
[2018-10-22T15:42:43,988][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [reindex]
[2018-10-22T15:42:43,989][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [repository-url]
[2018-10-22T15:42:43,990][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [transport-netty4]
[2018-10-22T15:42:43,991][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [tribe]
[2018-10-22T15:42:43,992][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-core]
[2018-10-22T15:42:43,993][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-deprecation]
[2018-10-22T15:42:43,994][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-graph]
[2018-10-22T15:42:43,994][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-logstash]
[2018-10-22T15:42:43,995][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-ml]
[2018-10-22T15:42:43,996][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-monitoring]
[2018-10-22T15:42:43,998][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-rollup]
[2018-10-22T15:42:43,999][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-security]
[2018-10-22T15:42:44,000][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-sql]
[2018-10-22T15:42:44,001][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-upgrade]
[2018-10-22T15:42:44,002][INFO ][o.e.p.PluginsService ] [localhost-debug] loaded module [x-pack-watcher]
[2018-10-22T15:42:45,130][INFO ][o.e.p.PluginsService ] [localhost-debug] no plugins loaded
初始化各模块
后面长达几百行的代码都是在构造各个模块及组件(AbstractComponent
类),如ScriptModule
、AnalysisModule
、SearchModule
、IndicesService
、ClusterService
、TransportService
等。
打印日志如下:
[2018-10-24T16:10:08,595][DEBUG][o.e.a.ActionModule ] Using REST wrapper from plugin org.elasticsearch.xpack.security.Security
[2018-10-24T16:10:08,856][INFO ][o.e.d.DiscoveryModule ] [localhost-debug] using discovery type [zen]
各个模块及组件分别承担什么功能,此处暂时不深究,实际上根据类名大致也能猜测出其作用。
3. 启动节点
这块代码在Node
类的start()
方法里面。
启动各组件
/**
* Start the node. If the node is already started, this method is no-op.
*/
public Node start() throws NodeValidationException {
if (!lifecycle.moveToStarted()) {
return this;
}
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("starting ...");
pluginLifecycleComponents.forEach(LifecycleComponent::start);
injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
nodeService.getMonitorService().start();
final ClusterService clusterService = injector.getInstance(ClusterService.class);
final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
transportService.start();
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
final MetaData onDiskMetadata;
try {
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
} else {
onDiskMetadata = MetaData.EMPTY_META_DATA;
}
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
} catch (IOException e) {
throw new UncheckedIOException(e);
}
...
}
这块start
了很多组件(Service
,都是AbstractComponent
的子类),具体逻辑暂时不深究,打印日志如下:
[2018-10-24T16:10:16,609][INFO ][o.e.n.Node ] [localhost-debug] starting ...
[2018-10-24T16:11:14,864][INFO ][o.e.t.TransportService ] [localhost-debug] publish_address {xx.xx.xx.xx:9300}, bound_addresses {xx.xx.xx.xx:9300}
验证node信息
validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
.filterPlugins(Plugin
.class)
.stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
这块代码对node
进行启动验证,日志如下:
[2018-10-24T16:33:31,983][INFO ][o.e.b.BootstrapChecks ] [localhost-debug] bound or publishing to a non-loopback address, enforcing bootstrap checks
如果修改了elasticsearch.yml
里面network.host
或者其它方式(如http.host
,transport.host
)的host
的配置,而不是默认的127.0.0.1
,那么bootstrap check
如果检测到问题就不是warning
而是error
了。
这里可能会有一些问题报出来。比如配置-Xms和-Xmx的值不相同就会有问题,还有一些其它如file descriptors
之类的问题,网上都能找到解决办法。
join cluster
discovery.startInitialJoin();
这行代码尝试把当前node
加入cluster
,不同的Discovery
有不同的实现,默认实现是ZenDiscovery
。
if (initialStateTimeout.millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
ClusterState clusterState = clusterService.state();
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
if (clusterState.nodes().getMasterNodeId() == null) {
logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
final CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) { latch.countDown(); }
@Override
public void onClusterServiceClose() {
latch.countDown();
}
@Override
public void onTimeout(TimeValue timeout) {
logger.warn("timed out while waiting for initial discovery state - timeout: {}",
initialStateTimeout);
latch.countDown();
}
}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
}
}
加入cluster
需要先找到master
节点,找到master
节点需要时间,这里会进行等待,等到master
节点出现,或超时(默认30秒)结束。
启用http
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).start();
}
这里根据配置启用http
,默认是开启的,打印日志如下:
[2018-10-24T19:13:07,544][INFO ][o.e.x.s.t.n.SecurityNetty4HttpServerTransport] [localhost-debug] publish_address {xx.xx.xx.xx:9200}, bound_addresses {xx.xx.xx.xx:9200}
到这里当前节点就已经成功启动了,此时仍然有可能还没有选出master
节点,没有加入cluster
,但不要紧,虽然不能提供集群服务,但凭借此单节点还是能提供一些服务的(可以尝试发送http
请求试验)。