#正常
>序列名
位点信息
--------------
#异常1
>序列名
位点信息>序列名
位点信息
--------------
#异常2
>序列名
位点信息>序列名位点信息
--------------
如上所示,处理序列过程中很容易遇到序列名
与位点信息
混合的文件。对于异常1,比较好处理,>序列名换到下一行就行了,但是异常2就比较麻烦了。为了批量处理所以准备了,一个python脚本。
核心就是逐行读取,如果是序列名就开始一个字典的key,把后续的行储存进值作为序列。如果发现位点信息序列中包含'>',则分割'>'前面做位点信息
输入给上一个字典的key的值,'>'后面开始一个字典的key,后续行作为位点信息。
最后再检查一般所有的key和value,
import argparse
#
def get_file_list(infilename):
infile = open(infilename, "r")
inlines_list = infile.readlines()
infile.close()
cleaned_lines = [line.strip() for line in inlines_list]
return cleaned_lines
def fix_fasta_format(input_fasta):
#逐行读取数据
lines = get_file_list(input_fasta)
#初始字典准备
current_id = None
sequence_dict = {}
#循环每一行 每当遇到id时更新current_id
input_seqid=[]
add_seqid=[]
for line in lines:
if line.startswith('>'):
#制备seq_id
current_id = line[1:]
sequence_dict[current_id] = ""
input_seqid.append(current_id)
else:
#处理序列文件,如果序列行中有seq id信息 更新current_id 后续序列信息加入current_id
if ">" in str(line):
seq_line=line.split('>')[0]
sequence_dict[current_id] += seq_line
id_line=line.split('>')[1]
current_id=id_line
add_seqid.append(id_line)
sequence_dict[current_id] = ""
else:
sequence_dict[current_id] += line
#检查输出
print(f"Normal ID : {len(input_seqid)} ; Newly added ID : {len(add_seqid)}")
return sequence_dict
def mian_work(input_fasta,output_name):
sequence_dict=fix_fasta_format(input_fasta)
#output_name = str(input_fasta).split("/")[-1].split('.')[0]
output_fas = open("{0}.new.fasta".format(output_name), 'a+')
output_err = open("{0}.err.fasta".format(output_name), 'a+')
for seq_id, sequence in sequence_dict.items():
if sequence == "":
#输出异常序列
idline = '>' + seq_id
print(idline, file=output_err)
else:
#输出正常序列
idline = '>' + seq_id
print(idline, file=output_fas)
print(str(sequence), file=output_fas)
output_fas.close()
output_err.close()
if __name__ == "__main__":
# 设置输入使用argparse
parser = argparse.ArgumentParser(description='修改统一名')
parser.add_argument('--input','-i',
type=str,
help='输入序列')
parser.add_argument('--output_name','-o',
type=str,
help='输出文件名 {output_name}.new.fasta')
args = parser.parse_args()
input_fasta=args.input
output_name=args.output_name
mian_work(input_fasta,output_name)