以前自学机器学习的时候经常遇到一些,10几个G甚至更大的数据集,自己的电脑的存储不太够,就没有做那些数据集了。直到最近因分析需求需要清洗100G+的数据,才真正接触到真正意义上的大数据,过程中发现脚本跑得慢已经是次要问题,因文件太大了导致内存过多,以至于处理的过程中被系统Ctrl + C,看了一圈也问了大模型,总结了下面这些注意点。PS:下面说的“超量”指处理超过内存大小的文件。
以处理VCF文件为例
- 此处的VCF来自gnomad数据库,有兴趣可以去下载,要从大文件中提取想要的数据并整理成结构化的数据并非容易的事。
- VCF(variant calling format)是一种记录变异信息的
tsv文件
,包含了注释行
,表头
,数据
; -
VCF的数据结构如下所示:
1、如何“超量”读写文件内容
文件读取:逐行读取文件到内存中,而不是一次性把文件都读到内存里面,使用迭代器
yield
去读取和生成数据,而不是通过return
的方法将数据一次性返回,这样可以逐行读取和生成数据而不是一次性把所有数据都读到内存里面。读取文件:python有一个内置模块
csv
,用recorder(迭代器)的方式可以将数据逐行读入
import csv
with open("test.vcf", "r") as f:
record = csv.reader(f, delimiter="\t")
for info in record:
print(info)
- 生成数据:
# 生成
def extract_data(超大文件):
XXX #提取的步骤
yield data
- 文件写入:处理后的数据通过使用重定向的方法,将结果输出到stdout中(浅显理解就是把结果直接输出到屏幕中)而不是在python中用
with open
的方式在脚本中打开写入的文件。随着数据的写入,with open
打开的文件会越来越大
,最终导致脚本被中断Aborted
。
#法一
import sys
sys.stdout.write(data)
# 法二
print(data)
2、如何“超量”地处理大数据
超量处理数据
- 目标:为了更快地处理数据
- 策略:
MapReduce
请记住这个策略
MapReduce简介
- MapReduce的策略就是先把数据集切割成小块,每个小块用一样的方式处理,最后合并的思想。
- 分块处理的方式能够避免过多数据一次性读进内存里面而导致的程序崩溃,每个小块还可以通过多线程/多进程/分布式的方式分发给不同的线程/进程/机器一起处理数据,实现同时处理同一个文件从而提高处理的效率。下面记录python和linux命令行实现MapReduce的方法
Python实现Map Reduce
- 最直接的例子就是pandas的分块功能与多进程联用的例子
import pandas
#可以先事先定义一个处理df的函数
def clean_df(df):
do_anything df
return df
merge_data = pd.Dataframe()
for chunk in df:
tmp_df = clean_df(chunk)
merge_data = pd.concat([tmp_df, merge_data] )
linux实现MapReduce
- 工具:split、parrellel、cat、qsub(有超算资源的可以选择集群投递的方式将小文件投到计算节点计算)
- 思路:split分割大文件,parallel并行处理
split -n l/10 bigfile smallfile
# 分布式处理文件
# 法一
parallel -j 4 "cat {} | tr ' ' '\n' | sort | uniq -c" ::: smallfile*
# 法二
ls script.sh |xargs -I {} qsub -cwd -l vf=4g -pe smp 1 {}
# 合并文件
cat smallfile* | sort | uniq -c > result.txt
3、如何提高处理大数据的效率
使用隐式循环和列表推导式替代显式循环
- 显式循环和隐式循环的区别在于处理的值是否会在内存中保留
- 显式循环的例子
a = [1,2,3]
for i in a:
print(a+1)
#列表推导式
a = [i + 1 for i in a]
使用内置函数
尽量使用python内置函数,numpy等等因为这些函数的底层语言是C,以下提供一些常用的内置函数供大家参考。
- len()函数:用于获取字符串、列表、元组、字典等对象的长度。
# 获取字符串长度
string = "Hello, world!"
length = len(string)
# 获取列表长度
list = [1, 2, 3, 4, 5]
length = len(list)
# 获取元组长度
tuple = (1, 2, 3, 4, 5)
length = len(tuple)
# 获取字典长度
dictionary = {'name': 'John', 'age': 30}
length = len(dictionary)
sorted()函数:用于对列表、元组等对象进行排序。
# 对列表进行排序
list = [3, 1, 4, 2, 5]
sorted_list = sorted(list)
# 对元组进行排序
tuple = (3, 1, 4, 2, 5)
sorted_tuple = sorted(tuple)
# 对字典进行排序
dictionary = {'name': 'John', 'age': 30}
sorted_dictionary = sorted(dictionary.items())
- max()函数和min()函数:用于获取列表、元组等对象中的最大值和最小值。
# 获取列表中的最大值和最小值
list = [3, 1, 4, 2, 5]
max_value = max(list)
min_value = min(list)
# 获取元组中的最大值和最小值
tuple = (3, 1, 4, 2, 5)
max_value = max(tuple)
min_value = min(tuple)
- sum()函数:用于计算列表、元组等对象中所有元素的和。
# 计算列表中所有元素的和
list = [1, 2, 3, 4, 5]
sum = sum(list)
# 计算元组中所有元素的和
tuple = (1, 2, 3, 4, 5)
sum = sum(tuple)
- filter()函数:用于过滤列表、元组等对象中的元素。
# 过滤列表中的偶数
list = [1, 2, 3, 4, 5]
even_numbers = list(filter(lambda x: x % 2 == 0, list))
# 过滤元组中的奇数
tuple = (1, 2, 3, 4, 5)
odd_numbers = tuple(filter(lambda x: x % 2 != 0, tuple))
- map()函数:用于对列表、元组等对象中的每个元素应用一个函数,并返回一个新的列表。这个函数可以减少
for循环这样的操作
# 对列表中的每个元素进行平方运算
list = [1, 2, 3, 4, 5]
squared_list = list(map(lambda x: x**2, list))
# 对元组中的每个元素进行加1运算
tuple = (1, 2, 3, 4, 5)
incremented_tuple = tuple(map(lambda x: x+1, tuple))
- reduce()函数:用于对列表、元组等对象中的元素进行累积计算,并返回一个结果。
# 对列表中的元素进行累加运算
from functools import reduce
list = [1, 2, 3, 4, 5]
sum = reduce(lambda x, y: x+y, list)
# 对元组中的元素进行累乘运算
tuple = (1, 2, 3, 4, 5)
product = reduce(lambda x, y: x*y, tuple)
- zip()函数:用于将多个列表、元组等对象中的元素按照索引进行配对,并返回一个新的元组列表。根据两个列表生成字典数据的时候非常实用
# 将两个列表中的元素按照索引进行配对
list1 = [1, 2, 3, 4, 5]
list2 = ['a', 'b', 'c', 'd', 'e']
zipped_list = list(zip(list1, list2))
# 将两个元组中的元素按照索引进行配对
tuple1 = (1, 2, 3, 4, 5)
tuple2 = ('a', 'b', 'c', 'd', 'e')
zipped_tuple = list(zip(tuple1, tuple2))
- enumerate()函数:用于将列表、元组等对象中的元素和它们的索引配对,并返回一个新的元组列表。 遍历列表时
enumerate(list)
的性能优于range(len(list))
,且可以一次性返回列表的索引和值。
# 将列表中的元素和它们的索引进行配对
list = ['a', 'b', 'c', 'd', 'e']
enumerated_list = list(enumerate(list))
# 将元组中的元素和它们的索引进行配对
tuple = ('a', 'b', 'c', 'd', 'e')
enumerated_tuple = list(enumerate(tuple))
- any()函数和all()函数:用于判断列表、元组等对象中的元素是否满足某个条件。
# 判断列表中是否存在偶数
list = [1, 2, 3, 4, 5]
has_even_number = any(map(lambda x: x % 2 == 0, list))
# 判断元组中是否所有元素都是奇数
tuple = (1, 3, 5, 7, 9)
all_odd_numbers = all(map(lambda x: x % 2 != 0, tuple))
使用Cython写的第三方工具包
- c语言但python版本,python的性能限制也是由于语言本身的特性所导致的,因此将python转变为高性能语言会大大地提升脚本的性能。如使用
cyvcf2
解析器,numba
加速器均可提高脚本的性能。(重点有限,这两个包会另开日志分享)
总结
- 本日志记录了处理超大size文件的思路和具体做法。我一直思考入了生信分析的门之后,会写脚本之后呢?怎么提升?what's next?在遇到性能瓶颈后从各个细节修改自己的代码后,查看书本发现,提升性能是在脚本各个细节处实现的。在意脚本性能是一个契机,将学到的新东西慢慢融入进自己的工作流中,时间一长自然会发现自己的进步了。
另:感兴趣的话点个赞再走,接下来会继续梳理和分享感兴趣的点。