做一个高性能的java流式存储项目你需要知道的一些事儿

1、目前能够在网上搜到的java相关的高性能文件io文章都比较基础,想深入的话需要既了解java的文件操作api原理,又了解文件操作相关的系统调用,这就造成了学习困难
2、实际上elasticsearch、kafka、rocketmq里关于文件操作的java实现已经很好,本文也参考了其中不少代码实现
3、本文的每个结论均提供测试代码验证,方便读者在自己的机器上验证
4、本文贴出的jdk native源码版本是openjdk的jdk8-b120
5、本文需要读者对java文件操作、虚拟内存、物理内存、pagecache有一定基础了解
6、本文的测试代码需要在TEST_PATH目录下提前准备test1-test30文件(根据自己测试环境调整),可以通过fallocate -l 1G test1快速创建30个1g大小的测试文件
7、测试代码依赖如下,引入netty只是为了使用其中的PlatformDependent工具类,引入JNA是为了执行系统调用

        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>1.37</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>1.37</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.101.Final</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>5.13.0</version>
        </dependency>

基础部分

FileChannel、MappedByteBuffer的初始化和释放

  • FileChannel的map函数是对系统调用mmap的阉割式封装,仅提供三种mode,READ_ONLY/READ_WRITE/PRIVATE对应mmap的prot、flags的部分组合,下面会贴出jdk源码,mmap系统调用原型:void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
  • 直接使用netty的工具类释放MappedByteBuffer,原理不展开了,读者可以额外查阅资料学习,也可以看我之前的文章https://www.jianshu.com/p/4f026fe063aa
  • 不论是何种方式获取的FileChannel,仅需要关闭FileChannel本身。此处无需关闭RandomAccessFile
  • 需要注意关闭FileChannel是不会释放MappedByteBuffer的,也就是说仅关闭FileChannel后MappedByteBuffer仍然可以继续使用
    FileChannel channel = new RandomAccessFile(TEST_PATH + "test1", "rw").getChannel();
    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
    PlatformDependent.freeDirectBuffer(mappedByteBuffer);
    fileChannel.close();
  • jdk map native源码部分核心如下,可以看到三种mode只对应mmap中prot和flags的三种组合,这也是我为什么说map方法是对mmap系统调用的阉割封装,mmap还有很多实用的flag我们在java中没法直接使用

完整源码:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileChannelImpl.c

    Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len)
    {
        if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
            protections = PROT_READ;
            flags = MAP_SHARED;
        } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
            protections = PROT_WRITE | PROT_READ;
            flags = MAP_SHARED;
        } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
            protections =  PROT_WRITE | PROT_READ;
            flags = MAP_PRIVATE;
        }
        mapAddress = mmap64(
                0,                    /* Let OS decide location */
                len,                  /* Number of bytes to map */
                protections,          /* File permissions */
                flags,                /* Changes are shared */
                fd,                   /* File descriptor of mapped file */
                off);                 /* Offset into file */
    }
  • FileChannel close最终调用FileChannelImpl的implCloseChannel方法,源码如下

可以看到会对parent执行close,所以对FileChannel执行close就会对RandomAccessFile执行close。其实对RandomAccessFile执行close也会对FileChannel执行close,所以这俩关闭一个就行。一般我们只把RandomAccessFile当做获取FileChannel的工具人,不会保留它的引用,所以最终只会执行FileChannel的close

    protected void implCloseChannel() throws IOException {
        if (this.fileLockTable != null) {
            Iterator var1 = this.fileLockTable.removeAll().iterator();
            while(var1.hasNext()) {
                FileLock var2 = (FileLock)var1.next();
                synchronized(var2) {
                    if (var2.isValid()) {
                        this.nd.release(this.fd, var2.position(), var2.size());
                        ((FileLockImpl)var2).invalidate();
                    }
                }
            }
        }
        this.threads.signalAndWait();
        if (this.parent != null) {
            ((Closeable)this.parent).close();
        } else {
            this.nd.close(this.fd);
        }
    }

FileChannel write和DirectBuffer的关系

  • 源Buffer是DirectBuffer则直接写入,是HeapBuffer则在cache中拿一块DirectBuffer,把数据拷贝进去再写入
  • 这个cache由jdk维护,可以看到里面的DirectBuffer没有主动释放逻辑,只能随着gc释放,因此有oom隐患
  • 所以最佳实践是用户自己构建DirectBuffer写入,并自己控制该DirectBuffer的释放,避免数据拷贝,也避免堆外内存的OOM
  • write方法最后使用sun.nio.ch.IOUtil的write方法,核心源码如下
    static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {
            int var5 = var1.position();
            int var6 = var1.limit();
            int var7 = var5 <= var6 ? var6 - var5 : 0;
            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
            int var10;
            try {
                var8.put(var1);
                var8.flip();
                var1.position(var5);
                int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                if (var9 > 0) {
                    var1.position(var5 + var9);
                }
                var10 = var9;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var8);
            }
            return var10;
        }
    }

FileChannel read和DirectBuffer的关系

  • 和write同理,直接使用DirectBuffer最好
  • 最终调用sun.nio.ch.IOUtil的read方法,核心源码如下
    static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1.isReadOnly()) {
            throw new IllegalArgumentException("Read-only buffer");
        } else if (var1 instanceof DirectBuffer) {
            return readIntoNativeBuffer(var0, var1, var2, var4);
        } else {
            ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
            int var7;
            try {
                int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                var5.flip();
                if (var6 > 0) {
                    var1.put(var5);
                }
                var7 = var6;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var5);
            }
            return var7;
        }
    }

FileChannel force参数true/false的区别

  • 对文件的写入其实都不会直接写入磁盘(directIO除外),只会写到buffer中,由操作系统统一调度写入磁盘,所以如果需要实时存储就需要主动调用force方法直接写入磁盘
  • jdk文档表示对FileChannel执行force不会保证对该FileChannel通过map方法获得的MappedByteBuffer进行刷盘,如果对MappedByteBuffer有修改需要刷盘,需要调用MappedByteBuffer自己的force方法
  • force方法有参数true/false,jdk文档说的不够具体,只说true的时候会额外写入metadata。实际上true/false对应的系统调用分别是fdatasync/fsync(两个系统调用的详细区别本文不展开),核心源码如下
  • fsync会比fdatasync多一次寻址,将文件大小、修改时间等metadata写入磁盘,性能会比fdatasync差一点,在文件大小固定的情况下可以仅调用fdatasync,文件大小不固定的情况下调用fdatasync没来的及更新文件大小可能会造成丢数据。其实由下面的压测结果可知这两个系统调用性能差距也不是很大(这里可能是固态硬盘和机械硬盘的区别?机械硬盘可能差距更大)

完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileDispatcherImpl.c

    Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this, jobject fdo, jboolean md)
    {
        if (md == JNI_FALSE) {
            result = fdatasync(fd);
        } else {
            result = fsync(fd);
        }
    }

FileChannel的transferTo方法介绍

对该方法的解析可以看我之前的文章:https://www.jianshu.com/p/11ed05ca62ff

  • transferTo可以利用sendFile系统调用做到真正的零拷贝传输数据,但是仅限于文件到文件,文件到socket两条路
  • 在文件下载场景可以使用该方法,将文件直接transferTo到socket中,我们自己的程序中是无法感知到文件内容的。最后通过文件hash值判断文件是否正确完整下载
  • 在非文件下载场景基本无法使用transferTo,因为我们的程序逻辑里大多需要从磁盘文件中读取到数据,然后做自己的业务处理,再将数据写入socket
  • 如果硬用transferTo做文件读取(文件到socket),考虑到下面的性能测试,我认为整体也不会比用MappedByteBuffer性能好

MappedByteBuffer load方法

  • 先执行load0 native方法,然后根据页大小,对每个页读取了一下,最后通过一个累加的x避免编译器认为这段代码是dead code优化掉
  • 可以看到load方法的目标就是主动触发缺页,从而将文件内容真正填充进物理内存中
  • MappedByteBuffer一开始仅仅是虚拟内存,不会分配真正的物理内存,用到的时候才会分配
    public final MappedByteBuffer load() {
        load0(mappingAddress(offset), length);
        // Read a byte from each page to bring it into memory. A checksum
        // is computed as we go along to prevent the compiler from otherwise
        // considering the loop as dead code.
        Unsafe unsafe = Unsafe.getUnsafe();
        int ps = Bits.pageSize();
        int count = Bits.pageCount(length);
        long a = mappingAddress(offset);
        byte x = 0;
        for (int i=0; i<count; i++) {
            x ^= unsafe.getByte(a);
            a += ps;
        }
        if (unused != 0)
            unused = x;
        return this;
    }
  • load0核心源码如下,主要就是执行系统调用madvise并且传入参数MADV_WILLNEED,本文对madvise不做展开,读者可以自行查阅

可以看出load0的作用是告诉操作系统这段内存接下来willneed,操作系统可以对这段内存做些优化处理,比如预读和提前加载之类的,可以加快后续对每个页都触发填充的效率
完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

    Java_java_nio_MappedByteBuffer_load0(JNIEnv *env, jobject obj, jlong address, jlong len)
    {
        int result = madvise((caddr_t)a, (size_t)len, MADV_WILLNEED);
    }

MappedByteBuffer isLoad方法

  • 主要就是直接调用isLoaded0 native方法
    public final boolean isLoaded() {
        return isLoaded0(mappingAddress(offset), length, Bits.pageCount(length));
    }
  • isLoaded0核心源码如下,通过系统调用mincore检查每个页是否都在物理内存中,mincore不做展开,读者可以自行查阅

完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

    Java_java_nio_MappedByteBuffer_isLoaded0(JNIEnv *env, jobject obj, jlong address, jlong len, jint numPages)
    {
        jboolean loaded = JNI_TRUE;
        unsigned char *vec = (unsigned char *)malloc(numPages * sizeof(char));
        mincore(address, (size_t)len, vec);
        for (i=0; i<numPages; i++) {
            if (vec[i] == 0) {
                loaded = JNI_FALSE;
                break;
            }
        }
        return loaded;
    }

调用了MappedByteBuffer的load方法后,isLoad不一定一直为true

当pagecache不够用了的时候,操作系统会将cache按照规则释放或者放入swap区,测试代码如下,如果测试电脑的可用内存小于30g,基本上最后会显示false

    public static void main(String[] args) throws IOException, InterruptedException {
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "/test1", "rw");
        FileChannel fileChannel = r.getChannel();
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        byteBuffer.load();
        new Thread(() -> {
            try {
                for (int i = 2; i < 31; i++) {
                    RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                    FileChannel channel = f.getChannel();
                    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                    buffer.load();
                }
                TimeUnit.DAYS.sleep(1);
            } catch (Exception e) {
            }
        }).start();
        TimeUnit.SECONDS.sleep(20);
        System.out.println(byteBuffer.isLoaded());
    }

FileChannel和MappedByteBuffer读写测试

  • 大部分研究java文件io的应该都看过这篇文章,或者各种转载抄袭的复制品,但是并不知道他是如何测试的,因此对这篇文章的结论不可全信

http://thinkinjava.cn/2019/05/12/2019/05-12-java-nio/

  • 使用JMH进行基准测试,对JMH不做展开,读者可以自行查阅,推荐资料

https://developer.aliyun.com/article/899469
https://dunwu.github.io/java-tutorial/pages/747d3e/#warmup

  • 本测试力求单纯测试读写,尽量避免其他的开销
  • 本测试仅读写文件的第一个页,避免pagecache的不确定性影响,如何获得当前页大小在下面的进阶部分有说明
  • 本测试使用的文件均固定大小1G,提前创建,参照本文最开始的说明(在文章最上面)
  • 测试读取时使用的目标ByteBuffer和数组均通过Threadlocal保存复用,避免每次创建ByteBuffer和数组的开销干扰
  • 测试写入时使用的源ByteBuffer和数组提前创建并共用,避免每次写入都创建的干扰开销
  • 测试包括如下几项

从FileChannel读到HeapByteBuffer
从FileChannel读到DirectByteBuffer
从HeapByteBuffer写入FileChannel
从DirectByteBuffer写入FileChannel
从DirectByteBuffer写入FileChannel并force(false)
从DirectByteBuffer写入FileChannel并force(true)
从MappedByteBuffer读到数组
从数组写入MappedByteBuffer
从HeapByteBuffer写入MappedByteBuffer
从DirectByteBuffer写入MappedByteBuffer
从DirectByteBuffer写入MappedByteBuffer并force

  • Linux version 3.10.0-327.ali2017.alios7.x86_64 16c32g的机器测试结果如下,可以说在文件大小固定的前提下MappedByteBuffer全面完胜FileChannel

FileChannel读到DirectBuffer比读到HeapBuffer性能好,如上文所述,符合预期
FileChannel写入DirectBuffer比HeapBuffer,如上文所述,符合预期
FileChannel force(false)比force(true)好一些
MappedByteBuffer读到数组中有压倒性读取优势,比FileChannel高两个数量级
MappedByteBuffer写入数组数据性能好一些,写入DirectBuffer和HeapBuffer差不多
MappedByteBuffer比FileChannel的写入性能、force性能都好
mac上测试结果差不多,但是MappedByteBuffer从数组、HeapBuffer、DirectBuffer写入差距不像linux上明显

Benchmark                                               Mode  Cnt      Score   Error   Units
MyBenchmark.fileChannelRead2DirectBuffer               thrpt         968.349          ops/ms
MyBenchmark.fileChannelRead2HeapBuffer                 thrpt         669.176          ops/ms
MyBenchmark.fileChannelWriteFromDirectBuffer           thrpt         286.462          ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForce      thrpt           3.992          ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForceMeta  thrpt           3.169          ops/ms
MyBenchmark.fileChannelWriteFromHeapBuffer             thrpt         258.994          ops/ms
MyBenchmark.mappedRead2Array                           thrpt       67872.596          ops/ms
MyBenchmark.mappedWriteFromArray                       thrpt        1675.428          ops/ms
MyBenchmark.mappedWriteFromDirectBuffer                thrpt        1585.909          ops/ms
MyBenchmark.mappedWriteFromDirectBufferForce           thrpt           6.651          ops/ms
MyBenchmark.mappedWriteFromHeapBuffer                  thrpt        1559.150          ops/ms
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
@Warmup(iterations = 3, time = 10)
@Measurement(iterations = 1, time = 10)
@Threads(16)
@State(Scope.Benchmark)
public class MyBenchmark {
    private static final int SIZE = pageSize;
    private FileChannel fileChannel;
    private MappedByteBuffer mappedByteBuffer;
    private final ThreadLocal<ByteBuffer> fileChannelRead2HeapBufferThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<ByteBuffer> fileChannelRead2DirectBufferThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<byte[]> mappedRead2ArrayThreadLocal = new ThreadLocal<>();
    private final byte[] srcByteArray = new byte[SIZE];
    private final ByteBuffer srcHeapBuffer = ByteBuffer.allocate(SIZE);
    private final ByteBuffer srcDirectBuffer = ByteBuffer.allocateDirect(SIZE);

    @Setup
    public void setup() throws IOException {
        for (int i = 0; i < SIZE; i++) {
            srcByteArray[i] = 9;
        }
        srcHeapBuffer.put(srcByteArray, 0, SIZE);
        srcHeapBuffer.flip();
        srcDirectBuffer.put(srcByteArray, 0, SIZE);
        srcDirectBuffer.flip();
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
        fileChannel = r.getChannel();
        mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
    }

    @TearDown
    public void tearDown() throws IOException {
        PlatformDependent.freeDirectBuffer(mappedByteBuffer);
        fileChannel.close();
    }

    @Benchmark
    public void fileChannelRead2HeapBuffer(Blackhole blackhole) throws IOException {
        ByteBuffer byteBuffer = fileChannelRead2HeapBufferThreadLocal.get();
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocate(SIZE);
            fileChannelRead2HeapBufferThreadLocal.set(byteBuffer);
        }
        blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelRead2DirectBuffer(Blackhole blackhole) throws IOException {
        ByteBuffer byteBuffer = fileChannelRead2DirectBufferThreadLocal.get();
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocateDirect(SIZE);
            fileChannelRead2DirectBufferThreadLocal.set(byteBuffer);
        }
        blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromHeapBuffer(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcHeapBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromDirectBuffer(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromDirectBufferForce(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        fileChannel.force(false);
    }

    @Benchmark
    public void fileChannelWriteFromDirectBufferForceMeta(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        fileChannel.force(true);
    }

    @Benchmark
    public void mappedRead2Array(Blackhole blackhole) {
        byte[] array = mappedRead2ArrayThreadLocal.get();
        if (array == null) {
            array = new byte[SIZE];
            mappedRead2ArrayThreadLocal.set(array);
        }
        blackhole.consume(mappedByteBuffer.slice().get(array, 0, SIZE));
    }

    @Benchmark
    public void mappedWriteFromArray(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcByteArray, 0, SIZE));
    }

    @Benchmark
    public void mappedWriteFromHeapBuffer(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcHeapBuffer.slice()));
    }

    @Benchmark
    public void mappedWriteFromDirectBuffer(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
    }

    @Benchmark
    public void mappedWriteFromDirectBufferForce(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
        mappedByteBuffer.force();
    }
}

进阶部分

java工程中通过JNA调用libc标准库函数

对JNA不做展开介绍,读者可以自行查阅学习:https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.NativeLong;
import com.sun.jna.Platform;
import com.sun.jna.Pointer;

public interface LibC extends Library {
    LibC INSTANCE = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
    // 是不是很熟悉,大家学的第一个函数应该就是这个吧:)
    void printf(String format, Object... args);
    // 本文的核心,后面详细介绍
    int mlock(Pointer var1, NativeLong var2);
}

通过mlock将锁定物理内存,避免内存被swap,保持物理内存常驻

mlock不做详细展开,读者可以自行查阅,推荐文章如下
http://www.daileinote.com/computer/linux_sys/32
https://www.cnblogs.com/linhaostudy/p/15972330.html

  • 其实mmap方法就可以直接设置flag为MAP_LOCKED将内存锁住,但是jdk没有提供该能力,所以我们需要才需要直接调用mlock锁住内存
  • 用户可以锁住的内存大小受操作系统限制,可以ulimit -l查看,mac和linux一般应该都是无限的unlimited
  • sysctl_max_map_count 规定了进程虚拟内存空间所能包含VmArea的最大个数,可以通过 /proc/sys/vm/max_map_count 内核参数来调整 sysctl_max_map_count,一次mmap对应产生一个VmArea
  • 对mmap的内存执行mlock后就已经触发了缺页将所有物理内存填充好了,也就不需要madvise,也不需要再调用MappedByteBuffer的load方法了,测试代码如下,最后byteBuffer.isLoaded()会显示true
    public static void main(String[] args) throws IOException, InterruptedException {
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
        FileChannel fileChannel = r.getChannel();
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        long address = PlatformDependent.directBufferAddress(byteBuffer);
        Pointer p = new Pointer(address);
        System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
        System.out.println(byteBuffer.isLoaded());
}
  • mlock之后无需调用munlock,会随着MappedByteBuffer的释放自动munlock(也符合文档说明,munmap自动munlock),测试代码如下

该测试程序可以永远执行下去,mlock的返回值也一直会是0(系统调用返回值0代表正常)
读者可以试试只调用mlock,但是不释放MappedByteBuffer的情况,小心电脑死机 : )

    public static void main(String[] args) throws IOException, InterruptedException {
        try {
            while (true) {
                for (int i = 1; i < 31; i++) {
                    RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                    FileChannel channel = f.getChannel();
                    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                    long add = PlatformDependent.directBufferAddress(buffer);
                    Pointer p = new Pointer(add);
                    System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
                    channel.close();
                    PlatformDependent.freeDirectBuffer(buffer);
                }
            }
        } catch (Exception e) {
        }
  • windows环境没有mlock,需要调用kernel32.dll中的VirtualLock系统调用,按照JNA文档需要实现JNA的StdCallLibrary接口

https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

  • windows环境下要更复杂一点,VirtualLock受大小和个数影响。在elasticsearch中有使用到,可以参考源码学习,本文不做展开(感觉在windows上搞这个优化没啥意义)

https://github.com/elastic/elasticsearch/blob/951640b73f71909013f57645cd30e1f19d8c2323/server/src/main/java/org/elasticsearch/bootstrap/JNAKernel32Library.java#L30

获取程序运行机器的操作系统页大小

  • 参考netty中PlatformDependent0中执行Bits类中unaligned方法的过程
    private int pageSize = AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
        try {
            Class<?> bitsClass = Class.forName("java.nio.Bits", false, PlatformDependent.getSystemClassLoader());
            Method pageSizeMethod = bitsClass.getDeclaredMethod("pageSize");
            pageSizeMethod.setAccessible(true);
            return (Integer) pageSizeMethod.invoke(null);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });

总结

  • 在java领域实现高性能流式存储现在看就比较清晰了,固定文件大小,通过MappedByteBuffer配合系统调用mlock,将最新的文件mlock到物理内存中,保证新数据的实时读取
  • 根据调用请求分析将部分热点老文件也mlock住,并做动态调整,避免大量冷读造成读取性能下降
  • RocketMQ中文件预热部分有mlock相关使用,但是为了兼容使用预热和不使用预热两种情况,在预热部分有些冗余调用,可以理解。预热过程既然调用了mlock就无需对每个页写一个字节填充物理页了,也无需调用madvise
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容