ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Nextflow patterns

2021-10-21 15:33:10  阅读:190  来源: 互联网

标签:fq ch process patterns Nextflow file input foo


Nextflow patterns

1 Basic Patterns

1.1 Channel duplication

P:需要在两个或多个进程中使用相同的通道作为输入

S:使用into运算符创建源通道的两个(或更多)副本。然后,使用新通道作为流程的输入。

代码:

Channel
    .fromPath('prots/*_?.fa')
    .into { prot1_ch; prot2_ch }

process foo {
  input: file x from prot1_ch
  script:
  """
    echo your_command --input $x
  """
}

process bar {
  input: file x from prot2_ch
  script:
  """
    your_command --input $x
  """
}

2 Scatter executions

2.1 Process per file path

P:需要为每个匹配 glob 模式的文件执行一个任务

S:使用Channel.fromPath方法创建一个通道,发出与 glob 模式匹配的所有文件。然后,使用通道作为执行任务的流程的输入。

代码:

Channel.fromPath('reads/*_1.fq.gz').set{ samples_ch }

process foo {
  input:
  file x from samples_ch

  script:
  """
  your_command --input $x
  """
}

2.2 Process per file chunk

P:需要将一个或多个输入文件拆分为块并为每个文件执行一项任务

S:使用splitText运算符将文件拆分为给定大小的块。然后将结果通道用作执行任务的流程的输入

代码:

Channel
    .fromPath('poem.txt')
    .splitText(by: 5)
    .set{ chunks_ch }

process foo {
  echo true
  input:
  file x from chunks_ch

  script:
  """
  rev $x | rev
  """
}

2.3 Process per file pairs

P:需要将文件处理到按对分组的目录中

S:使用Channel.fromFilePairs方法创建一个通道,该通道发出与 glob 模式匹配的文件对。该模式必须匹配成对文件名中的公共前缀。匹配文件作为元组发出,其中第一个元素是匹配文件的分组键,第二个元素是文件对本身。

代码:

Channel
    .fromFilePairs('reads/*_{1,2}.fq.gz')
    .set { samples_ch }

process foo {
  input:
  set sampleId, file(reads) from samples_ch

  script:
  """
  your_command --sample $sampleId --reads $reads
  """
}
  • 自定义分组策略

需要时,可以定义自定义分组策略。一个常见的用例是对齐 BAM 文件 ( sample1.bam) 随附的索引文件。困难在于索引有时会被调用sample1.bai,有时sample1.bam.bai取决于所使用的软件。下面的例子可以适应这两种情况。

代码:

Channel
    .fromFilePairs('alignment/*.{bam,bai}') { file -> file.name.replaceAll(/.bam|.bai$/,'') }
    .set { samples_ch }

process foo {
  input:
  set sampleId, file(bam) from samples_ch

  script:
  """
  your_command --sample $sampleId --bam ${sampleId}.bam
  """
}

2.4 Process per file range

P:需要在具有共同索引范围的两个或更多系列文件上执行任务

S:使用from方法定义重复执行任务的范围,然后将其与map运算符链接以将每个索引与相应的输入文件相关联。最后使用结果通道作为过程的输入

代码:

Channel
  .from(1..23)
  .map { chr -> tuple("sample$chr", file("/some/path/foo.${chr}.indels.vcf"), file("/other/path/foo.snvs.${chr}.vcf")) }
  .set { pairs_ch }


process foo {
  tag "$sampleId"

  input:
  set sampleId, file(indels), file(snps) from pairs_ch

  """
  echo foo_command --this $indels --that $snps
  """
}

2.5 Process per CSV record

P:需要为一个或多个 CSV 文件中的每条记录执行一项任务

S:使用splitCsv运算符逐行读取 CSV 文件,然后使用map运算符返回每行所需字段的元组,并使用该file函数将任何字符串路径转换为文件路径对象。最后使用结果通道作为过程的输入

index.csv

sampleIdread 1read2
FC816RLABXXread/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_1.fq.gzread/110101_I315_FC816RLABXX_L1_HUMrutRGXDIAAPE_2.fq.gz
FC812MWABXXread/110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_1.fq.gzread110105_I186_FC812MWABXX_L8_HUMrutRGVDIABPE_2.fq.gz
FC81DE8ABXXread/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_1.fq.gzread/110121_I288_FC81DE8ABXX_L3_HUMrutRGXDIAAPE_2.fq.gz
FC81DB5ABXXread/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_1.fq.gzread/110122_I329_FC81DB5ABXX_L6_HUMrutRGVDIAAPE_2.fq.gz
FC819P0ABXXread/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_1.fq.gzread/110128_I481_FC819P0ABXX_L5_HUMrutRGWDIAAPE_2.fq.gz

代码:

params.index = 'index.csv'

Channel
    .fromPath(params.index)
    .splitCsv(header:true)
    .map{ row-> tuple(row.sampleId, file(row.read1), file(row.read2)) }
    .set { samples_ch }

process foo {
    input:
    set sampleId, file(read1), file(read2) from samples_ch

    script:
    """
    echo your_command --sample $sampleId --reads $read1 $read2
    """
}

2.6 Process per file output

P:工作流中的任务一次生成两个或更多文件。下游任务需要独立处理这些文件中的每一个

S:使用flatten运算符将上游进程的输出转换为单独发送到每个文件的通道。然后将此通道用作下游过程的输入

代码:

process foo {
  output:
  file '*.txt' into foo_ch
  script:
  '''
  echo Hello there! > file1.txt
  echo What a beautiful day > file2.txt
  echo I wish you are having fun1 > file3.txt
  '''
}

process bar {
  input:
  file x from foo_ch.flatten()
  script:
  """
  cat $x
  """
}

3 Gather results

3.1 Process all outputs altogether

P:需要完全处理上游任务的所有输出

S:使用collect运算符收集上游任务产生的所有输出,并将它们作为唯一输出发出。然后使用结果通道作为过程的输入输入

代码:

Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }

process foo {
  input:
  file x from samples_ch
  output:
  file 'file.fq' into unzipped_ch
  script:
  """
  < $x zcat > file.fq
  """
}

process bar {
  echo true
  input:
  file '*.fq' from unzipped_ch.collect()
  """
  cat *.fq
  """
}

3.2 Process outputs into groups

P:需要在同一批次中处理文件名中具有匹配键的所有文件

S:使用map运算符将每个文件关联一个从文件名中提取的键。然后将结果通道与groupTuple运算符链接起来,将所有具有匹配键的文件组合在一起。最后使用结果通道作为过程的输入。

代码:

Channel
    .fromPath('reads/*')
    .map { file ->
        def key = file.name.toString().tokenize('_').get(0)
        return tuple(key, file)
     }
    .groupTuple()
    .set{ groups_ch }


process foo {
  input:
  set key, file(samples) from groups_ch

  script:
  """
  echo your_command --batch $key --input $samples
  """
}

3.3 Collect outputs into a file

P:需要将上游进程生成的所有输出文件连接到一个文件中

S:使用collectFile运算符将所有输出文件合并为一个文件

代码:

Channel.fromPath('reads/*_1.fq.gz').set { samples_ch }

process foo {
  input:
  file x from samples_ch
  output:
  file 'file.fq' into unzipped_ch
  script:
  """
  < $x zcat > file.fq
  """
}

unzipped_ch
      .collectFile()
      .println{ it.text }

4 Organize outputs

4.1 Store process outputs

P:需要将一个或多个进程的输出存储到您选择的目录结构中

S:使用publishDir指令设置一个自定义目录,在该目录中需要提供流程输出

代码:

params.reads = 'reads/*{1,2}.fq.gz'
params.outdir = 'my-results'

Channel.fromFilePairs(params.reads).set{ samples_ch }

process foo {
  publishDir "$params.outdir/$sampleId"
  input:
  set sampleId, file(samples) from samples_ch
  output:
  file '*.fq'

  script:
  """
  < ${samples[0]} zcat > sample_1.fq
  < ${samples[1]} zcat > sample_2.fq
  """
}

4.2 Store outputs matching a glob pattern

P:工作流中的任务会创建下游任务所需的许多输出文件。可根据文件名将其中一些文件存储到单独的目录中

S:使用两个或多个publishDir指令将输出文件存储到单独的存储路径中。对于每个指令,使用选项指定一个不同的 glob 字符串,pattern仅将与提供的模式匹配的文件存储到每个目录中

代码:

Channel
    .fromFilePairs(params.reads, flat: true)
    .set{ samples_ch }

process foo {
  publishDir "$params.outdir/$sampleId/counts", pattern: "*_counts.txt"
  publishDir "$params.outdir/$sampleId/outlooks", pattern: '*_outlook.txt'
  publishDir "$params.outdir/$sampleId/", pattern: '*.fq'

  input:
    set sampleId, file('sample1.fq.gz'), file('sample2.fq.gz') from samples_ch
  output:
    file "*"
  script:
  """
    < sample1.fq.gz zcat > sample1.fq
    < sample2.fq.gz zcat > sample2.fq

    awk '{s++}END{print s/4}' sample1.fq > sample1_counts.txt
    awk '{s++}END{print s/4}' sample2.fq > sample2_counts.txt

    head -n 50 sample1.fq > sample1_outlook.txt
    head -n 50 sample2.fq > sample2_outlook.txt
  """
}

4.3 Rename process outputs

P:需要将进程的输出存储到一个目录中,为文件指定一个您选择的名称

S:publishDir 允许在过程输出存储在所选择的目录。指定saveAs参数为每个文件提供您选择的名称,证明自定义规则作为闭包(closure)

代码:

process foo {
 publishDir 'results', saveAs: { filename -> "foo_$filename" }

 output:
 file '*.txt'

 '''
 touch this.txt
 touch that.txt
 '''
}
  • 若保存在子目录中:

可以使用相同的模式将特定文件存储在不同的目录中,具体取决于实际名称

代码:

process foo {
 publishDir 'results', saveAs: { filename -> filename.endsWith(".zip") ? "zips/$filename" : filename }

 output:
 file '*'

 '''
 touch this.txt
 touch that.zip
 '''
}

5 Other

5.1 Get process work directory

P:需要当前任务工作目录的显式路径

S:使用$PWDBash 变量或pwd命令检索任务工作目录路径

代码:

process foo {
  echo true
  script:
  """
  echo foo task path: \$PWD
  """
}

process bar {
  echo true
  script:
  '''
  echo bar task path: $PWD
  '''
}

注意:$当命令脚本包含在双引号字符中时,请确保使用转义变量占位符,如上在双引号中的$前加上了\来进行了转义

  • 使用以下命令为相同的脚本提供一些输入文件,以防止进程被执行
nextflow run patterns/process-get-workdir.nf --inputs ../data/prots/\*

5.2 Ignore failing process

P:预期任务在特定条件下会失败,由此希望忽略失败并继续执行工作流中的剩余任务

S:使用 process指令 errorStrategy 'ignore'忽略错误条件

代码:

process foo {
  errorStrategy 'ignore'
  script:
  '''
    echo This is going to fail!
    exit 1
  '''
}

process bar {
  script:
  '''
  echo OK
  '''
}

5.3 Mock dependency

P:需要同步两个没有直接输入输出关系的进程bar的执行,以便该进程仅在 process 完成后执行foo

S:将foo产生标志值的通道添加到进程的输出中。然后将此通道用作进程的输入以bar在其他进程完成时触发其执行

代码:

Channel
    .fromPath('.data/reads/*.fq.gz')
    .set{ reads_ch }

process foo {
    output:
    val true into done_ch

    script:
    """
    your_command_here
    """
}

process bar {
    input:
    val flag from done_ch
    file fq from reads_ch

    script:
    """
    other_commad_here --reads $fq
    """
}

6 Advanced patterns

6.1 Conditional resources definition

P:工作流中的任务需要使用一定量的计算资源,例如。内存取决于一个或多个输入文件的大小或名称。

S:使用闭包以动态方式声明资源需求,例如memorycpus等。闭包使用size流程定义中声明的输入的文件属性(例如等),来计算所需的资源量。

代码:

Channel
    .fromPath('reads/*_1.fq.gz')
    .set { reads_ch }

process foo {
    memory { reads.size() < 70.KB ? 1.GB : 5.GB }

    input:
    file reads from reads_ch

    """
    your_command_here --in $reads
    """
}

6.2 Conditional process executions

P:两个不同的任务需要以互斥的方式执行,那么第三个任务应该对前一次执行的结果进行后处理。

S:使用when语句有条件地执行两个不同的进程。每个进程声明自己的输出通道。然后使用mix运算符创建一个新通道,该通道将发出两个进程产生的输出,并将其用作第三个进程的输入。

代码:

params.flag = false

process foo {
  output:
  file 'x.txt' into foo_ch
  when:
  !params.flag

  script:
  '''
  echo foo > x.txt
  '''
}

process bar {
  output:
  file 'x.txt' into bar_ch
  when:
  params.flag

  script:
  '''
  echo bar > x.txt
  '''
}

process omega {
  echo true
  input:
  file x from foo_ch.mix(bar_ch)

  script:
  """
  cat $x
  """
}

可根据flag的不同来选择不同的进程执行,执行fooomega

nextflow run patterns/conditional-process.nf

执行baromega

nextflow run patterns/conditional-process.nf --flag

结果:

peng@sin-try2:~/patterns$ nextflow run conditional-process.nf   
N E X T F L O W  ~  version 21.04.0-edge
Launching `conditional-process.nf` [soggy_hypatia] - revision: 1b07ba0b38
executor >  local (2)
[82/ea867d] process > foo       [100%] 1 of 1 ✔
[-        ] process > bar       -
[38/be26fb] process > omega (1) [100%] 1 of 1 ✔
foo

peng@sin-try2:~/patterns$ nextflow run conditional-process.nf --flag
N E X T F L O W  ~  version 21.04.0-edge
Launching `conditional-process.nf` [astonishing_goldwasser] - revision: 1b07ba0b38
executor >  local (2)
[-        ] process > foo       -
[7f/158b8c] process > bar       [100%] 1 of 1 ✔
[05/b06073] process > omega (1) [100%] 1 of 1 ✔
bar
  • 有条件地正常(使用数据)或作为通道创建输入 通道。使用各个输入通道的过程仅在通道被填充时才会执行。每个进程仍然声明自己的输出通道。然后使用mix运算符创建一个新通道,该通道将发出两个进程产生的输出,并将其用作第三个进程的输入

代码:

params.flag = false

(foo_inch, bar_inch) = ( params.flag
                     ? [ Channel.empty(), Channel.from(1,2,3) ]
                     : [ Channel.from(4,5,6), Channel.empty() ] )

process foo {

  input:
  val(f) from foo_inch

  output:
  file 'x.txt' into foo_ch

  script:
  """
  echo $f > x.txt
  """
}

process bar {
  input:
  val(b) from bar_inch

  output:
  file 'x.txt' into bar_ch

  script:
  """
  echo $b > x.txt
  """
}

process omega {
  echo true
  input:
  file x from foo_ch.mix(bar_ch)

  script:
  """
  cat $x
  """
}

运行结果:

peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf 
N E X T F L O W  ~  version 21.04.0-edge
Launching `conditional-process2.nf` [naughty_minsky] - revision: 296937c5d2
executor >  local (6)
[0b/8183d2] process > foo (3)   [100%] 3 of 3 ✔
[-        ] process > bar       -
[e8/e9334f] process > omega (3) [100%] 3 of 3 ✔
6

5

4

peng@sin-try2:~/patterns$ nextflow run conditional-process2.nf --flag
N E X T F L O W  ~  version 21.04.0-edge
Launching `conditional-process2.nf` [disturbed_goldwasser] - revision: 296937c5d2
executor >  local (6)
[-        ] process > foo       -
[aa/907692] process > bar (3)   [100%] 3 of 3 ✔
[c1/e15b2f] process > omega (3) [100%] 3 of 3 ✔
1

2

3

6.3 Skip process execution

P:工作流程中有两个连续的任务,当指定了可选标志时,不应执行第一个任务,其输入由第二个任务处理。

S:使用条件表达式中创建的空通道,在指定可选参数时跳过第一个执行流程。然后将第二进程的输入定义为第一进程的输出(当被执行时)与输入信道的mix

代码:

params.skip = false
params.input = "$baseDir/sample.fq.gz"

Channel.fromPath(params.input).set{ input_ch }

(foo_ch, bar_ch) = ( params.skip
                 ? [Channel.empty(), input_ch]
                 : [input_ch, Channel.empty()] )

process foo {
  input:
  file x from foo_ch

  output:
  file('*.fastq') into optional_ch

  script:
  """
  < $x zcat > ${x.simpleName}.fastq
  """
}

process bar {
  echo true
  input:
  file x from bar_ch.mix(optional_ch)
  """
  echo your_command --input $x
  """
}

6.4 Feedback loop☆

P:需要重复执行一个或多个任务,使用输出作为新迭代的输入,直到达到特定条件(i=o=i…)

S:使用迭代循环中最后一个进程的输出作为第一个进程的输入。使用Channel.create方法显式创建输出通道。然后将过程输入定义为初始输入和过程输出的mix,该过程输出应用于定义终止条件的until运算符

代码;

params.input = 'hello.txt'

condition = { it.readLines().size()>3 }
feedback_ch = Channel.create()
input_ch = Channel.fromPath(params.input).mix( feedback_ch.until(condition) )

process foo {
    input:
    file x from input_ch
    output:
    file 'foo.txt' into foo_ch
    script:
    """
    cat $x > foo.txt
    """
}

process bar {
    input:
    file x from foo_ch
    output:
    file 'bar.txt' into feedback_ch
    file 'bar.txt' into result_ch
    script:
    """
    cat $x > bar.txt
    echo World >> bar.txt
    """
}

result_ch.last().println { "Result:\n${it.text.indent(' ')}" }

6.5 Optional input

P:一个或多个进程有一个可选的输入文件

S:使用特殊的文件名来标记文件参数的缺失

代码:

params.inputs = 'prots/*{1,2,3}.fa'
params.filter = 'NO_FILE'

prots_ch = Channel.fromPath(params.inputs)
opt_file = file(params.filter)

process foo {
  input:
  file seq from prots_ch
  file opt from opt_file

  script:
  def filter = opt.name != 'NO_FILE' ? "--filter $opt" : ''
  """
  your_commad --input $seq $filter
  """
}

6.6 Optional output

P:在某些情况下,工作流中的任务预计不会创建输出文件

S:将此类输出声明为optional文件

代码:

process foo {
  output:
  file 'foo.txt' optional true into foo_ch

  script:
  '''
  your_command
  '''
}

6.7 Execute when empty

P:如果通道为空,您需要执行一个过程

S:使用ifEmpty运算符发出标记值以触发流程的执行

代码:

process foo {
  input:
  val x from ch.ifEmpty { 'EMPTY' }
  when:
  x == 'EMPTY'

  script:
  '''
  your_command
  '''
}

标签:fq,ch,process,patterns,Nextflow,file,input,foo
来源: https://blog.csdn.net/qq_40202164/article/details/120887557

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有