生信中的python——从基因组中提目标序列
昨天组会后,和同门讲起曾经我刚进实验室的辛酸历程。2G内存的小服务器,硬是让我做人类基因组数据,于是写程序必须各种优化还得注意控制内存。所以有了今天的想法,聊聊怎么从基因组中提序列。实现的功能不重要,主要是讲一下怎么处理大文件,真正提序列可以用samtools等现有软件,不用重复造轮子。
- 问题描述
- 读取大文件
- 从简单的开始
- Unittest单元测试
写在前面:
有人给我打赏了!!!!
感谢感谢,希望我的内容对你们有帮助。我平时忙起来特别忙,闲起来特别闲=-=,如果有几个小时的空闲与其看个电影不如写写博客,自己也能学习一下。
生信中还是需要处理一些大数据的,比如组学数据或者测序数据。一般需求就是需要看一下数据内容,定位某一行的内容,查找某个东西,或者逐行进行数据处理等等。
如果内存足够大,在算法合理的情况下,理论上不用考虑大文件处理,全部塞到内存里去任性就行;不过如果内存有限,那么就需要点策略了。这次我们先来讨论一个
1. 问题描述
假设有这么个应用场景:
有一个比较大的fasta文件,只有一个染色体,如下:
>chr1A
ACATCGTAGCTAGCATCGGGGGGGGGGATGCATCGTACGTAGCTAGCTCATCGCGTGATG
CCGTACGTACGTAGCGCGCTCTAGCTCCGTAGCTAGCTAGCTAGTCGATCGTAGCTAGCT
....
我现在需要提取以下序列要怎么办?
#为gff3注释文件的部分列
chr1A mRNA 130446 131722 - ID=TRITD1Av1G000090.1
2. 处理大文件的特定行
python读文件一般是这么读的:
f=open("./demo.fa","r")
s=f.read()
这就相当于把demo.fa的全部内容全部放到内存(里面叫s的变量)。如果对于大文件这么搞就容易把服务器卡死。
python里面有几个方法都可以读大文件,比如with;
#with的方法。
#with会自带异常包,遇到问题抛出异常
#同时,他会把文件变成迭代器,自动进行内存管理,这也是我最常用的方案。
with open("demo.fa","r") as f:
for line in f:
#处理line
# 此外,还有:
# read(size)将文件分块,循环readline(),直接用fileinput包,seek方法等。
还有大佬推荐mmap包,感兴趣的可以试试。 不过这里,我们想对于fasta文件的某一行进行处理,2个策略:
- 一行一行读,记录每行长度,到指定行停下来开始处理
- 直接跳到某一行
如果你的fasta文件每行长度不同(通常是不可能的),那似乎只能用方法1了,或者linecache了解一下。
不过一般fasta格式的文件每行字符数是固定的,所以我们就用方法2,我们用f.seek这个函数。
那么开始写代码
3. 从简单的开始
首先需要做一个能用的东西:
Atlas编程哲学:先能用,再谈别的
为啥要写程序,不就是为了实现一个功能。没必要全部做好,先实现全部功能之后慢慢改。
如果是我要做上面这个问题,首先我会先设计一个简单的测试文件(尽可能多的包含可能遇到的情况),实现我需要的全部功能后再上大基因组。
建一个叫test.fa的文件
>test
AAACCCGAAA
AAAAAAAAAC
CGAAAACCCC
CCCCCCCCCC
CCCGAAAAC
再做一个叫test.gff的文件,则需要提取的序列为(最后一列为提取出的序列)
test 4 7 + CCCG
test 20 22 + CCG
test 27 44 + CCCCCCCCCCCCCCCCCG
test 49 49 + C
test 4 7 - CGGG
test 20 22 - GCC
test 27 44 - CGGGGGGGGGGGGGGGGG
test 49 49 - G
可以看到测试文件中只有A,C,G,提取的是包含C和G的序列。如果是+,那么提取出来的序列C多G少,如果是-,提取出来是C少G多,这样方便判断。并且文件包含了内部、换行、头部等情况。
观察以下test.fa,第一行为title,之后每行都是10(除最后一行),所以首先我们想知道title那行的长度是多少:
f=open("./test.fa",'r')
titleLen=len(f.readline())
print(titleLen)
# 输出6, ">test"还有一个换行符
我们先测试第一个
beg=4
end=7
f.seek(titleLen+beg-1) #文件句柄移动到beg前一位
print(f.read(end-beg+1))
#输出CCCG,成功!
这里如果失败了,可能是因为你在win操作,文件结尾除了\n还有\r。可以先用某种方式(tr啊或者编辑器)去除\r再进行下面的操作。
再测试下一个(完整代码):
f=open("./test.fa",'r')
titleLen=len(f.readline())
beg=20
end=22
f.seek(titleLen+beg-1)
print(f.read(end-beg+1))
#输出:
#AC
#
#,不对了
因为有换行符...beg这个值并不包括换行符,所以需要把beg修正一下。
f=open("./test.fa",'r')
titleLen=len(f.readline())
lenline=10 #每行碱基长度
beg=20
end=22
sbeg=int((beg-1)/lenline)
f.seek(titleLen+beg-1+sbeg,0)
print(f.read(end-beg+1+send).replace("\n",""))
#输出
#CC
开头对了,看来还得再矫正一下end
f=open("./test.fa",'r')
titleLen=len(f.readline())
lenline=10
beg=20
end=22
sbeg=int((beg-1)/lenline)
send=int((end-1)/lenline)-sbeg
f.seek(titleLen+beg-1+sbeg,0)
print(f.read(end-beg+1+send).replace("\n",""))
#输出CCG,对了
先不弄别的,我们把上面的打包成一个函数,那么完整的代码变成:
def getSeq(f,tlen,lenline,beg,end):
sbeg=int((beg-1)/lenline)
send=int((end-1)/lenline)-sbeg
f.seek(tlen+beg-1+sbeg,0)
return f.read(end-beg+1+send).replace("\n","")
if __name__ == '__main__': #如果是直接运行这个程序的话,运行以下代码。
f=open("./test.fa",'r')
titleLen=len(f.readline())
lenline=10
beg=4
end=7
print(getSeq(f,titleLen,lenline,beg,end))
好啦,到一个小段落了。
那么我们为什么要封装成一个函数呢?
主要是为了代码的重用以及让主体代码看起来简洁一点。
重用就是,如果你以后想完成类似的功能,就把整个函数拷贝过去(或者导入),之后传入对应格式的参数,就可以得到想要的结果。
封装成一个一个的方法后,程序就变成了搭积木一般的存在。你只需要在你的主体中搭好一个个积木,处理好输入和输出就行了。
这样做其实还有一个好处,就是方便查bug。你可以用单元测试的方法测试每一个方法,如果方法没问题,那么就认为这一块代码是没问题的,就可以专注于其他块。
回顾一下,我们先测了一个简单的情况,实现了功能;之后发现这个代码不适用于第二种情况,于是大量修改了代码。那么问题来了,这个代码是否还支持我们的第一种情况呢?于是,unittest就出现了。
4. Unittest单元测试
什么是单元测试。
简单说就是,给个例子让程序的一部分跑一下,看看结果和预期的一样不。
那么我们就可以在单元测试中加入很多例子,每次修改代码后测试一下看曾经的例子还能通过不?
import unittest
from get_seq import getSeq #之前的代码保存在get_seq.py里面
class TestDemo(unittest.TestCase):
f=open("./test.fa",'r')
titleLen=len(f.readline())
lenline=10
#写了2个简单的test
#分别是上面的两个例子
def test1(self):
beg=4
end=7
targetSeq="CCCG"
self.assertEqual(getSeq(self.f,self.titleLen,self.lenline,beg,end),targetSeq)
if __name__ == '__main__':
unittest.main()
#输出:
#..
#----------------------------------------------------------------------
#Ran 2 tests in 0.001s
#
#OK
其实,unittest的方法很多,最后评价结果的方法也很多,大家可以详细看一下。这里给python官方说明的链接:
最后,我们完善一下我们的程序:
首先,再写一个转义的函数,负责处理负链的情况
nt={
"A":"T","T":"A","G":"C","C":"G","N":"N",
"a":"t","t":"a","g":"c","c":"g","n":"n"
}
def trans(nt,seq):
seq=seq[::-1] #序列反转
nseq=""
for s in seq:
nseq = nseq + nt[s]
return nseq
最后成型的程序:
#get_seq.py
def trans(nt,seq):
seq=seq[::-1]
nseq=""
for s in seq:
nseq = nseq + nt[s]
return nseq
def getSeq(f,tlen,lenline,beg,end):
sbeg=int((beg-1)/lenline)
send=int((end-1)/lenline)-sbeg
f.seek(tlen+beg-1+sbeg,0)
return f.read(end-beg+1+send).replace("\n","")
if __name__ == '__main__':
nt={
"A":"T","T":"A","G":"C","C":"G","N":"N",
"a":"t","t":"a","g":"c","c":"g","n":"n"
}
f=open("./test.fa",'r')
titleLen=len(f.readline())
lenline=10
with open("./test.gff","r") as fgff:
for line in fgff:
tmp=line.strip().split("\t")
if (tmp[3] == "+"):
print(line.strip()+"\t"+getSeq(f,titleLen,lenline,int(tmp[1]),int(tmp[2])))
else:
print(line.strip()+"\t"+trans(nt,getSeq(f,titleLen,lenline,int(tmp[1]),int(tmp[2]))))
单元测试为:
import unittest
from get_seq import getSeq,trans
class TestDemo(unittest.TestCase):
f=open("./test.fa",'r')
titleLen=len(f.readline())
lenline=10
def test1(self):
beg=4
end=7
targetSeq="CCCG"
self.assertEqual(getSeq(self.f,self.titleLen,self.lenline,beg,end),targetSeq)
def test2(self):
beg=20
end=22
targetSeq="CCG"
self.assertEqual(getSeq(self.f,self.titleLen,self.lenline,beg,end),targetSeq)
def test3(self):
nt={
"A":"T","T":"A","G":"C","C":"G","N":"N",
"a":"t","t":"a","g":"c","c":"g","n":"n"
}
seq="AATCG"
targetSeq="CGATT"
self.assertEqual(trans(nt,seq),targetSeq)
if __name__ == '__main__':
unittest.main()
