跳至主要內容

生信中的Python——定位polyN

Haopeng Yu大约 15 分钟

马上快毕业了,师妹跟我聊说她现在程序还是不太会写,让我在撤退之前给她补救一下;同时,老婆大人突然励志要奋发努力的学程序,所以我准备推出一系列生信代码教程。主要是通过一些示例,写一下我写生信程序的思路和经验,并不包括具体的语法。第一个就写一个有关polyN定位的例子,示例来自于已经工作的孟师妹问我的一个问题,在此与大家分享。

  • 生信案例描述
  • 最直接的解决方案
  • 适用于更多情况

写在前面:

我并不想把python的语法或者什么再重复一遍,没有任何意义。现在描述python基本语法的教学网站很多,我推荐几个吧:

上面两个通常就够用了。不过一般如果语法忘了,大家可以直接搜,搜索引擎也是IDE嘛。

1. 生信案例描述

故事是这样的:

已经工作的孟师妹突然给我发消息,说有一个task她的程序跑的特别慢,由于公司需要效率,所以问我能不能优化一下程序。

场景是这样的:

她有一个基因组的fasta文件,里面包含poly N序列,现在需要把这些序列定位出来生成一个类似gff的表。

fasta示例文件如下:

>chr1
AACGNNNNACGTAC

那么,就应该生成

#label  Chr     Beg End
polyN   chr1    5   8   

简单吧,那么,游戏开始~


注意,我非常建议你先自己想一下怎么解决这个问题,写好代码再继续看下一节。这样收获更大。 同理,每一节我还会提出问题,建议还是先完成相应的代码,再看下一节。


2. 最直接的解决方案

最直接的方案就是:

  1. 一行一行把文件读出来
  2. 遇到“>”记录一下fasta的title,序列开始计算
  3. 把序列一个个读出来,看是不是“N”,如果是记录开始位置,直到不是N时输出。

这样就出现了下面的代码:

首先先建立一个叫test.fa的文件保存序列,内容为:

>chr1
AACGNNNNACGTAC

之后写个python

print("#label\tChr\tBeg\tEnd")

f=open("./test.fa",'r') #开文件
lines=f.readlines() #按照行读取,保存在名为lines的list里面
#lines[0]是大于号的那行,把大于号后面的内容输出,strip()去换行符
print("polyN\t"+lines[0][1:].strip(),end="")

#之后需要处理序列那行,设置一个小标签jump,防止重复输出
#0代表false,>0为true。
#按理说赋值数字就行,为了方便理解,我先用true和false代替
jump=True
for i in range(len(lines[1])):  #把每一个碱基拉过来遍历
    if (lines[1][i] == "N"): #如果是N,记录位置
        if (jump): #第一次遇到N,为true,记录位置;之后为false,不再记录
            print("\t"+str(i+1),end="") #记录开始位置
            jump=False #以后在上面的jump判断中不再通过,不再重复输出起始位置
    else:
        if (not jump): # 如果这个碱基不是N,并且已经开始记录N位置了(jump为False)
                        # 那么,not jump为true,记录一下结束位置。
            print("\t"+str(i))
            break                

就像预期,输出和上面示例输出一模一样:

#label  Chr     Beg     End
polyN   chr1    5       8

评价一下上面的代码。

上面内容有一个小技巧就是jump,因为遇到第一个N需要记录起始位置,直到第一个非N位点记录终止位置。

不过,这个代码已经知道了需要读取的文件有2行,第一行为名字,第二行为序列,且只有一段polyN,所以相当于针对于这个数据“定制的”代码。

这段代码可以说"泛化"极弱,换个fasta数据立刻报错。不过我写这个教程的时候并没有把这个最简单的程序这个省去,因为这也是我写程序的思路之一。

Atlas编程哲学:不要浪费时间在没有用的考虑上,一切都是为了最快的获得正确的结果。

如果某个task以后应用的场景极低,可以认为是一次性代码,用完就扔,那么就忘记“泛化”,最快速度达到目的才是最重要的。

也就是说:如果你需要在这个test.fa后面再加点序列,直接打开编辑器加就行了,不需要写一个通用的文档追加程序,完善所有功能,做成一个产品,或者自己开发个编辑器啥的......

3. 适用于更多情况

上一个程序的弊端在于,只能处理2行的,只有一段polyN的序列。真正的polyN序列往往很多段,fasta文件也有很多行,很多染色体,于是上面的程序就用不了了。

3.1 支持多染色体和多段polyN

所以首先需要增加的功能就是:

  • 支持多个染色体在一个fasta中的情况
  • 支持多段polyN
print("#label\tChr\tBeg\tEnd") #打印title

f=open("./test.fa",'r') #开文件
lines=f.readlines() #按照行读取,保存在名为lines的list里面
chrname="" #设个变量记录染色体名字
for line in lines: 
    line=line.strip() #去除一下末尾的换行
    if (line[:1] == '>'): #如果line的第一个是">"
        chrname=line[1:] #记录一下染色体名字
    else:
        beg=-1 #首先设置一个叫beg的标签,记录起始坐标
        for i in range(len(line)):  #把每一个碱基拉过来遍历
            if (line[i] == "N"): #如果是N,记录位置
                if (beg == -1):
                    beg=i+1
            else: #如果不是N,且beg已经开始记录了,那么输出前一个位置
                if (beg>0):
                    print("polyN\t"+chrname+"\t"+str(beg)+"\t"+str(i))
                    beg=-1  #最后,beg初始化,赋值-1进入下一轮循环

上段代码输出为:

#label  Chr     Beg     End
polyN   chr1    5       8

woo,和期望结果的一模一样。

同时,如果我们弄得稍微复杂点,有2段polyN,上面这段代码也是OK的。比如,test.fa改成:

>chr1
AACGNNNNACGTNNNNACGTA

则输出:

#label  Chr     Beg     End
polyN   chr1    5       8
polyN   chr1    13      16

多染色体也OK,test.fa修改成:

>chr1
AACGNNNNACGTNNNNACGTA
>chr2
AACGNNNNACGTNNNNACGTA

则输出:

#label  Chr     Beg     End
polyN   chr1    5       8
polyN   chr1    13      16
polyN   chr2    5       8
polyN   chr2    13      16

不过,如果fasta序列是这样的:

>chr1
AACGNNNNACGTNNNNACGTA
AACGNNNNACGTNNNNACGTA

额,输出是:

#label  Chr     Beg     End
polyN   chr1    5       8
polyN   chr1    13      16
polyN   chr1    5       8
polyN   chr1    13      16

也就是程序目前只能处理单行fasta,无法多行,所以需要进一步修改。

3.2 支持多行

多行,首先想到的是需要一个变量记录每行的长度,定位的位置加上这个长度不就行了。

pos=0 # for循环前定义一个pos变量

    else:
        ...#省略代码
        beg = pos + i + 1
        ...#省略代码
    pos = pos + len(line) #最后一行加这个

输出为:

#label  Chr     Beg     End
polyN   chr1    5       8
polyN   chr1    13      16
polyN   chr1    26      29
polyN   chr1    34      37

这下正常了,不过别急,如果换行呢?比如:

>chr1
ACTAGCATCGTACNNN
NNNACGTACGTACGTA

输出:

#label  Chr     Beg     End
polyN   chr1    17      19

第一行直接掠过去了(由于没有非N作为阻断),直接记录第二行,位置倒是对的。

所以,现在我们有2个策略:

  1. 把多行fasta首先转换成单行fasta,直接用前面的代码就行。
  2. 想出一个多行的策略解决这种情况

那么,应该用哪个方案呢?

Atlas编程哲学:在边界内效率最大化。

软件工程师,为啥叫工程师而不是软件科学家?因为工程师需要考虑到实际情况,而不是所有研究全部处于理论状态。

对于写代码,需要考虑的就是能用的内存、硬盘空间和CPU,以及代码运行起来的时间复杂度和空间复杂度。

也就是说:

  • 如果我所需要处理的情况,就是上面那个示例文件,我就直接肉眼数出来就行,也别写代码了。
  • 如果我需要处理较大的染色体,而内存可以塞下,那么我就采用方案1,简单粗暴,多行变一行,直接解决。
  • 如果我内存不够,我就需要采用方案2,修改策略,单行处理。

对于其他情况,通常还需要考虑自己的硬件优势

  • 如果处理很多基因组,而我的优势的多CPU,就需要对文件进行拆分进行多进程处理;或者写多线程进行处理。
  • 如果优势是大硬盘,劣势是小内存,那么要么一点一点处理,要么把内存的东西先写到硬盘,一步一步处理。

总的来说,写程序并不是黑写,需要像工程师那样考虑实际情况进行效率最大化的优化,可能在不同场景需要不同的处理策略。当然,如果硬件很好,大内存,大硬盘,主频高核还多的CPU,就可以任性,想怎么写就怎么写。

回到主题,我倾向于写多行的策略,适用范围广还挑战一下自己;不过如果平时着急出结果的代码,我会用方案1尽快出结果。

其实,改起来很简单,把beg往外圈放就好了:

print("#label\tChr\tBeg\tEnd") 

f=open("./test.fa",'r') 
lines=f.readlines() 
chrname="" 
pos=0
beg=-1 #把beg移动到这个位置
for line in lines: 
    line=line.strip() 
    if (line[:1] == '>'):
        chrname=line[1:] 
        beg = -1 # 这里还需要再初始化一下,重置一下beg和pos
        pos=0
    else:
        for i in range(len(line)): 
            if (line[i] == "N"): 
                if (beg == -1):
                    beg=pos+i+1
            else: 
                if (beg>0):
                    print("polyN\t"+chrname+"\t"+str(beg)+"\t"+str(i+pos))
                    beg=-1 
        pos = pos +len(line)
if (beg>0):
    print("polyN\t"+chrname+"\t"+str(beg)+"\t"+str(pos)) #如果末尾有polyN再输出一下

测试test.fa:

>chr1
AACGNNNNACGTNNNNACGTA
ACTAGCATCGTACNNN
NNNACGTACGTACGTA
>chr2
ACTAGCATCGTACNNN
NNNACGTACGTACNNN

输出:

#label  Chr     Beg     End
polyN   chr1    5       8
polyN   chr1    13      16
polyN   chr1    35      40
polyN   chr2    14      19
polyN   chr2    30      32

这个程序写好了,简单示例已经可以获得正确的结果,那么就可以上大基因组了。那么这时候,下一个问题就出现了。

4. 时间!!!

一段代码的复杂度在各种算法课本中可以查到,不过我这里并不想讲怎么计算这个复杂度。如果用上面这段代码去跑大基因组,那么有一个问题就会出现,程序需要跑多久?

最简单的方法就是,跑一遍不就知道多久了?所以一般比如要跑10G的数据,可以先拿10M或者100M的数据跑一遍,最后耗时按照比例增加就行了。

time 你的代码.py 

耗时需要看自己的承受能力。比如10min,1h,1day,1week等等,按照自己接受能力去优化代码。如果已经估计出来的时间可以接受,并且以后这段代码不会再用,那么就没有优化的必要。比如耗时需要1天,但是我可能通过优化可以变成半天,然鹅我可以去聚聚餐,打打球,明天这时候直接收结果就行了,为啥要优化?

不过,如果估计出来的1week或者更长,自己肯定无法接受,那么就要想办法进行优化。

我采用的优化策略一般有以下2种:

  1. 用更多的资源

包括原来用一台电脑跑一周,我开7台电脑跑一天就好了啊~或者是原来是单核的程序,可用的cpu有20核,我分上18个文件,同时跑18个不就更快;如果资源需要共享啊,交互啊什么的,可以写多线程(不过线程锁、线程间的共享似乎是个坑)

  1. 优化算法

第一种就不说了,第二种深有体会。

4.1 算法的优化

算法优化很容易就可以有量级级别的提升。

故事开始:

我记得最清楚的是,我刚进实验室时,师兄F需要做1000w个数无重复依次抽取100w个数,他rand(1000w)后,抽一个就和抽出来的判断一下是否抽重了,如果重了重新抽,于是这段代码写出来越来越慢直至卡死;之后我生成1000w的数组,之后从里面抽一个删一个(当时想的是从口袋中抽球),估计4-6小时可以跑完;最后,我师兄M通过数组元素交换的方法,写的程序3秒跑完。这次对我印象太深刻了,好的算法真的可以带来量级上的提升。

我现在一般用到的算法优化策略有以下几种:

  1. 避免循环嵌套。
  2. 避免不必要的资源浪费。比如多余的变量,多余的数组和链表等。部分编程语言有较好的垃圾回收机制可以避免这种内存占用,不过我还是习惯清空不用数组,关闭打开的文件等减少内存占用。
  3. 数组利于按index查询不利于遍历搜索和增删,链表利于增删不利于遍历,哈希键值查询极快等。利用各种数据结构的优势进行代码编写。
  4. 引用我大学的计算机老师勇哥的话:“人想的多,计算机运算就快”。比如1到10亿的加和可以遍历求和也可以直接用高斯的等差数列公式,很明显后者快。
  5. 尽量把大程序拆成小块,小的子程序,这样也比较容易写单元测试。
  6. 字符串查询上,通常正则表达式要快=-=
  7. linux比Win快=-=
  8. linux上用shell比Perl和Python快=-=

那么,对于上面我们写的这个算法,我试了一下我的电脑上567M的fasta需要跑1m12s。可能这个属于我前面说的无需优化的类型,接杯水回来就跑完了。

然鹅,如果需要跑1T的数据,就需要36.9小时,一天半。所以,按照上面的策略5,我们试试正则表达式。

4.2 上,正则表达式!

最早孟师妹问我的时候,她采用的代码就是前面我说的(我估计是的,因为是通常的思路就是这样的),她觉得太慢了。

于是我第一反应就是,既然是匹配,那么上正则表达式呗。不过前提条件就是,需要内存足够大,序列需要放一行。如果是多行fasta正则表达式匹配,遇到3中提到的多行的问题还不够麻烦的=-=,效率也不见得能提高多少。

所以,孟师妹修改后的代码:

import re

print("#label\tChr\tBeg\tEnd") 
#这里直接采用孟师妹的函数,把fasta的名那行变为偶数行,序列变为奇数行,首行为空
def ToSingle(fasta_file):
    str_my = open(fasta_file).read() #文件全部读到内存
    first_res = re.sub(r'(>.*)\n', r'@\1@', str_my) #正则表达式替换,把开头有>也就是标题行用两个@包围
    sec_res = first_res.replace('\n', '') #去除全部换行符(这里其实没考虑到windows的\r)
    res = sec_res.replace('@', '\n') #把刚才标记的@变成换行
    return res

file_arr = ToSingle(sys.argv[1]).split("\n") #用\n分割文件,生成数组
reg = re.compile(r'N+',re.I) #编译正则表达式,多个N,忽略大小写
chrom = ""
for index in range(1,len(file_arr)):
    if (file_arr[index].startswith(">")):
        chrom = file_arr[index].replace('>', '') #获得染色体名
    else:
        for sub_str in reg.finditer(file_arr[index]):
            start, end = sub_str.span()
            start = start+1
            print(f"polyN\t{chrom}\t{start}\t{end}") #3.6新增的格式化输出方式

这样,我再运行刚才567Mfasta,只用了15s,提高了4.8倍。1T数据时间可以从36.9小时减少到7.7小时。

最后思考,如果一行序列内存放不下怎么办? 分多行?之后再判断切割位点附近序列有没有polyN?

所以,可以看到,写程序也有这个特点,有钱有资源就可以少折腾,233333。

5. 应用性和泛性

既然我们已经得到了一个差不多高效的程序了,还需要做什么?

当然是整理好代码以后还可以继续用啊。也就是这个程序要适应各种不同的情况,而不只是test.fa。此外,如果其他人也想用这个软件,他们没亲自编写程序,于是就有可能犯各种各样的错误。

所以,我通常的做法就是,把上面的算法“打包”,也就是认为“找polyA”是一个箱子,只要输入fasta文件,他就能输出polyN的位置。那么,我就需要把fasta文件给这个箱子而不是给个gbk的文件。

那么,就需要用到:

  1. 显示文件用法readme,设置参数的包:argparse, sys.argv
  2. Biopython里面完善的读fasta文件的包: SeqIO

等。这里我就不再给出具体的代码了。

当然,如果想让程序更易用,也可以加入更多的功能。比如有人需要获得"非polyN"序列的位置,也就是把正则表达式编译那里的

    reg = re.compile(r'[^N]+',re.I)
    #不过这里匹配非N,也许数字或者其他奇奇怪怪的也会进来哦。
    #所以需要输入数据有质控。

等等,会有很多很多需要完善的功能,而在你完善的过程中,你的程序能力就会飞速的提升。

好啦,今天这个例子就到这里,总结一下。

这个例子主要说了我写程序的一些思路和优化经验,并展示了正则表达式。

希望大家有所收获,欢迎与我交流。

博主简介
博主简介