毕业10年了,终于下了决定换了个新公司。初来乍到新公司,kick off一个新的ES项目,打算借此机会去研究一下ES代码。一边网上搜了大量的源码分析的blog,顺着这些作者的思路去看代码,自己也一边把自己的思绪记录下来,顺带也可以看出ES5.x和ES2.x的区别。这算是做了10年码农第一次认认真真地去磨一个开源项目的代码,难免很多错漏,此系列仅用于本人记录和学习之用,如果有对ES5志同道合的朋友欢迎一起讨论。
在此之前看了一位朋友的源码分析,在每个分析之前都先把问题列出,带着问题来看源码,于是顺其思路来看他的blog,感觉这样学起来效果甚好,于是写这堆ES5.x源码分析我也打算用这种思路,帮助自己来理解。
带着问题读源码:
- 启动入口在哪个类里
- Elasticsearch 启动需要初始化些什么东西
- Elasticsearch 如何管理线程池
- service bean是如何注入,都有些什么modules
由于Elasticsearch 用的是Gradle 管理依赖,所以,把最新的代码pull下来之后,在根目录运行一把 gradle idea
先解决依赖,也可以让idea自动解决。其实Elasticsearch的包逻辑分明,熟悉ES各种功能的码农对每个包名应该都十分亲切吧,入口类还算简单,顾名思义,10秒即可定位出来。
找入口找main
方法。
public static void main(final String[] args) throws Exception {
// we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
// presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy)
System.setSecurityManager(new SecurityManager() {
@Override
public void checkPermission(Permission perm) {
// grant all permissions so that we can later set the security manager to the one that we want
}
});
LogConfigurator.registerErrorListener();
final Elasticsearch elasticsearch = new Elasticsearch();
int status = main(args, elasticsearch, Terminal.DEFAULT);
if (status != ExitCodes.OK) {
exit(status);
}
}
static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
return elasticsearch.main(args, terminal);
}
该类是一个Command 类,因此主要还是增加一些启停的hook,然后就是一开始先设置了一个SecurityManager,用意看注释吧,接着打印了一些基本参数后则进入init
方法,在方法里会调用Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
// Set the system property before anything has a chance to trigger its use
initLoggerPrefix();
// force the class initializer for BootstrapInfo to run before
// the security manager is installed
BootstrapInfo.init();
从方法里可以看出,ES采用手工干预SecurityManager的用意就是随时去操做一些设计permission相关的操作。紧接着会配置LogConfig,check PID file,还有Lucene Jar file的sanity check,接着很快就会调用Bootstrap.setup()
继续看看这个方法:
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
Settings settings = environment.settings();
try {
spawner.spawnNativePluginControllers(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
initializeNatives(
environment.tmpFile(),
BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
// initialize probes before the security manager is installed
initializeProbes();
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
}
}
});
}
try {
// look for jar hell
JarHell.checkJarHell();
} catch (IOException | URISyntaxException e) {
throw new BootstrapException(e);
}
// Log ifconfig output before SecurityManager is installed
IfConfig.logIfNecessary();
// install SM after natives, shutdown hooks, etc.
try {
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
} catch (IOException | NoSuchAlgorithmException e) {
throw new BootstrapException(e);
}
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final Settings settings,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(settings, boundTransportAddress, checks);
}
};
}
initializeNatives ()
方法是去调用native方法进行一些OS调用,比如我们最最熟悉的检查 mlock,而initializeProbes
则是初始化一些必要的managedBean,用于以后的检测process,os, jvm等状态。这些都是单例的类,并最终给其他服务调用。接着就是一个shutdown hook用来关闭资源,最后就去初始化本Node了,留意创建时复写的一个validate node的方法,就是ES5 里面检测是否起在非回路IP的一些检测就放在这里。
上面就回答了第一个问题和第二个问题的一半了,keep moving.
Node的构造函数有足足250多行,我们逐个来看...
try {
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
tmpSettings = TribeService.processSettings(tmpSettings);
// create the node environment as soon as possible, to recover the node id and enable logging
try {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
resourcesToClose.add(nodeEnvironment);
} catch (IOException ex) {
throw new IllegalStateException("Failed to create node environment", ex);
}
final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
Logger logger = Loggers.getLogger(Node.class, tmpSettings);
final String nodeId = nodeEnvironment.nodeId();
tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
if (DiscoveryNode.nodeRequiresLocalStorage(tmpSettings)) {
checkForIndexDataInDefaultPathData(tmpSettings, nodeEnvironment, logger);
}
// this must be captured after the node name is possibly added to the settings
final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
if (hadPredefinedNodeName == false) {
logger.info("node name [{}] derived from node ID [{}]; set [{}] to override", nodeName, nodeId, NODE_NAME_SETTING.getKey());
} else {
logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
}
这里花了很大边幅去初始化这个nodeEnvironment,我个人觉得应该是就是为了要尽快确定nodeName,因为这个nodeName最后会绑定输出哪些东西到log,也会影响日志的输出。从addNodeNameIfNeeded()
里看到,如果没有给出nodeName,则会用UUID的前7位来作为NodeName。
下面则是打印一下JvmInfo,代码不贴了。
接着,这里提提,在ES2.x时有一段处理default.path.data
的逻辑,而5不再support了,参考
Default settings are no longer supported
Previous versions of Elasticsearch allowed a user to set a default setting for any setting. The default setting was only applied if the actual setting was not already set. This feature was trappy, and the complexity that it introduced was prone to bugs. Due to this, we have elected to make a breaking change in a minor release to remove this feature with the exception of default.path.conf, default.path.data, and default.path.logs which remain to support packaging. A future version of Elasticsearch will remove support for these as well, so users should stop relying on this functionality.
接着就开始创建Executors 和threadPool了
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
super(settings);
assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) {
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
}
builders.put(builder.name(), builder);
}
this.builders = Collections.unmodifiableMap(builders);
threadContext = new ThreadContext(settings);
final Map<String, ExecutorHolder> executors = new HashMap<>();
for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
if (executors.containsKey(executorHolder.info.getName())) {
throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
}
logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
executors.put(entry.getKey(), executorHolder);
}
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = unmodifiableMap(executors);
this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.scheduler.setRemoveOnCancelPolicy(true);
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}
这里比较重要,原来在ES的threadPool中,根据不同的类型分别分配了不同线程数的一个线程池,而executor由一个executorBuilder来提供,所以submit task的时候也需要指定不同的Name,例如代码里的GENERIC
的线程池就从4 可以一直延伸到512。最后创建一个1线程的scheduler来执行定时任务。最后创建一个执行timer的线程。
再继续往下看Node的构造方法就会看到接下来会new 一堆的services和modules,这里就不一一过了,其共性就是都会绑定刚刚创建的threadPool,已经也会绑定必要的services,某些module本身具有后台线程的话,初始化完成需要调用.start()
去启动这些后台线程。
if (networkModule.isHttpEnabled()) {
httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
httpBind = b -> {
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
};
} else {
httpBind = b -> {
b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
};
httpServerTransport = null;
}
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService());
NodeService nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
modules.add(b -> {
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
b.bind(NodeClient.class).toInstance(client);
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(TribeService.class).toInstance(tribeService);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase()));
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
scriptModule.getScriptService()));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings, xContentRegistry,
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders));
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
indicesService, recoverySettings, clusterService));
b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
transportService, recoverySettings, clusterService));
}
httpBind.accept(b);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
}
);
injector = modules.createInjector();
回答完第2,3个问题,从上面代码看到,回答最后一个问题了,从最后一个代码就可以看到,ES采用了Guice
来负责初始化bean和解决bean之间的依赖,我感觉不用Spring的原因是Spring太耗资源了,而不是网上说的觉得Guice
绑定速度超快,这些都是单例,其实依赖注入再快也省不到些什么时间。从代码可以看到它的语法是b.bind(依赖类型).to(返回的Bean)
。最后上面最后这段代码顺带提一下几个比较重要的模块,如Discovery,ClusterService,Transport Service,还创建了NodeClient用来接收全部其他节点请求。这些都会在往后重点剖析。
全部搞定后绑定端口然后回到Bootstrap
里调用.start()
开始服务。
第一次看代码可能理解不尽相同,如有纰漏欢迎指正。