ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Pipeline 的实现与arraya_reduce的妙用

2022-06-22 01:32:35  阅读:187  来源: 互联网

标签:function Pipeline return pipes reduce next pipe arraya data


管道的一般逻辑

  • 可以动态的添加管道处理命令
  • 传入初始的数据,依次经过管道处理
  • 满足一定条件跳出管道,否则经由下一个管道命令处理

简单实现

function pipeA($in) {
  if ($condition) {
    return 'break';
  }
  return do_something($in);
}
// function pipeB
// function pipeC
$pipes = ['pipeA', 'pipeB', 'pipeC'];
$data = 'before pipe';
foreach ($pipes as $pipe) {
  $data = $pipe($data);
  if ($condition_break) {
    break;
  }
}
printf("after pipe: %s", $data);

代码看起来没啥问题,除了一点小瑕疵。我们注意到pipeA函数里始终需要有一个返回标识来告诉Pipe什么时候需要进入下一个管道命令,什么时候需要跳出。我们可以改进一下:

function pipeA($in) {
  if ($condition) {
    return ['break', 'return value'];
  }
  return ['continue', 'prepare to next'];
}
// function pipeB
// function pipeC
$pipes = ['pipeA', 'pipeB', 'pipeC'];
$data = 'before pipe';
foreach ($pipes as $pipe) {
  list($flag, $data) = $pipe($data);
  if ($flag == 'break') {
    break;
  }
}
printf("after pipe: %s", $data);

这次看起来问题得到了解决,然而在工程上很容易产生问题:管道函数会由不同的开发人员定义,他们可能不能严格遵循上面这种苛刻的返回结果。我们希望有一种更简单明确的管道写法,满足条件则返回,否则进入下一个管道。例如传入闭包就很不错:

function pipe($data, $next) {
  if ($condition) {
    return $result;
  } else {
    // process by next pipe
    return $next($data);
  }
}

可是这样一来,我们的管道处理过程应该怎么写呢?我们发现,这个$next闭包,实际上嵌套包含了后面所有的管道处理函数:

pipeA()
else $nextA()

$nextA = function(){
  pipeB()
  else $nextB()
};
$nextB = function() {
  pipeC()
  else $nextC()
}
...

也就是说,我们需要从最后一个管道命令开始嵌套定义这个闭包,然后再执行闭包函数:


function pipeA($in, $next) {
  if (!random_int(0, 3)) {
    return $in . ' break in pipe A';
  }
  return $next($in.' through A');
}
function pipeB($in, $next) {
  if (!random_int(0, 2)) {
    return $in . ' break in pipe B';
  }
  return $next($in.' through B');
}
function pipeC($in, $next) {
  if (!random_int(0, 1)) {
    return $in . ' break in pipe C';
  }
  return $next($in.' through C');
}

$pipes = ['pipeA', 'pipeB', 'pipeC'];
$data = 'Dapianzi';

// 创建闭包
function create_next($next, $curr) {
  // 返回一个闭包,接收待处理的管道数据
  return function($data) use ($next, $curr) {
    // 将待处理的数据和下一个管道命令传入当前管道命令
    return $curr($data, $next);
  };
}
// 初始闭包 $next
// 实际上最后执行
$next = function($data) {
  return $data . ' pipe done';
};

// 从尾巴开始定义闭包$next
$pipes = array_reverse($pipes);
foreach ($pipes as $pipe) {
  $next = create_next($next, $pipe);
}

$data = $next($data);

printf("after pipe: %s", $data);

我们有一个初始化的$next, 依次访问$pipes数组的每个元素,最后返回一个$next。如果你有印象的话,马上就会想到这不就是array_reduce吗?

// function pipeA, pipeB, pipeC

$pipes = ['pipeA', 'pipeB', 'pipeC'];
$data = 'Dapianzi';

$pipeline = array_reduce(array_reverse($pipes), function($next, $curr) {
    return function($data) use ($next, $curr) {
        return $curr($data, $next);
    };
}, function($data) {
    return $data . ' pipe done';
});

$data = $pipeline($data);

printf("after pipe: %s", $data);

至此,一个简单的Pipeline就完成了。

比较Pipeline源码

最后贴一个Laravel中对Pipeline实现的源码,跟上面不能说大同小异吧,简直就是一摸一样(确信)。

<?php

namespace Illuminate\Pipeline;

use Closure;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Pipeline\Pipeline as PipelineContract;
use RuntimeException;
use Throwable;

class Pipeline implements PipelineContract
{
    /**
     * The container implementation.
     *
     * @var \Illuminate\Contracts\Container\Container
     */
    protected $container;

    /**
     * The object being passed through the pipeline.
     *
     * @var mixed
     */
    protected $passable;

    /**
     * The array of class pipes.
     *
     * @var array
     */
    protected $pipes = [];

    /**
     * The method to call on each pipe.
     *
     * @var string
     */
    protected $method = 'handle';

    /**
     * Create a new class instance.
     *
     * @param  \Illuminate\Contracts\Container\Container|null  $container
     * @return void
     */
    public function __construct(Container $container = null)
    {
        $this->container = $container;
    }

    /**
     * Set the object being sent through the pipeline.
     *
     * @param  mixed  $passable
     * @return $this
     */
    public function send($passable)
    {
        $this->passable = $passable;

        return $this;
    }

    /**
     * Set the array of pipes.
     *
     * @param  array|mixed  $pipes
     * @return $this
     */
    public function through($pipes)
    {
        $this->pipes = is_array($pipes) ? $pipes : func_get_args();

        return $this;
    }

    /**
     * Set the method to call on the pipes.
     *
     * @param  string  $method
     * @return $this
     */
    public function via($method)
    {
        $this->method = $method;

        return $this;
    }

    /**
     * Run the pipeline with a final destination callback.
     *
     * @param  \Closure  $destination
     * @return mixed
     */
    public function then(Closure $destination)
    {
        $pipeline = array_reduce(
            array_reverse($this->pipes()), $this->carry(), $this->prepareDestination($destination)
        );

        return $pipeline($this->passable);
    }

    /**
     * Run the pipeline and return the result.
     *
     * @return mixed
     */
    public function thenReturn()
    {
        return $this->then(function ($passable) {
            return $passable;
        });
    }

    /**
     * Get the final piece of the Closure onion.
     *
     * @param  \Closure  $destination
     * @return \Closure
     */
    protected function prepareDestination(Closure $destination)
    {
        return function ($passable) use ($destination) {
            try {
                return $destination($passable);
            } catch (Throwable $e) {
                return $this->handleException($passable, $e);
            }
        };
    }

    /**
     * Get a Closure that represents a slice of the application onion.
     *
     * @return \Closure
     */
    protected function carry()
    {
        return function ($stack, $pipe) {
            return function ($passable) use ($stack, $pipe) {
                try {
                    if (is_callable($pipe)) {
                        // If the pipe is a callable, then we will call it directly, but otherwise we
                        // will resolve the pipes out of the dependency container and call it with
                        // the appropriate method and arguments, returning the results back out.
                        return $pipe($passable, $stack);
                    } elseif (! is_object($pipe)) {
                        [$name, $parameters] = $this->parsePipeString($pipe);

                        // If the pipe is a string we will parse the string and resolve the class out
                        // of the dependency injection container. We can then build a callable and
                        // execute the pipe function giving in the parameters that are required.
                        $pipe = $this->getContainer()->make($name);

                        $parameters = array_merge([$passable, $stack], $parameters);
                    } else {
                        // If the pipe is already an object we'll just make a callable and pass it to
                        // the pipe as-is. There is no need to do any extra parsing and formatting
                        // since the object we're given was already a fully instantiated object.
                        $parameters = [$passable, $stack];
                    }

                    $carry = method_exists($pipe, $this->method)
                                    ? $pipe->{$this->method}(...$parameters)
                                    : $pipe(...$parameters);

                    return $this->handleCarry($carry);
                } catch (Throwable $e) {
                    return $this->handleException($passable, $e);
                }
            };
        };
    }

    /**
     * Parse full pipe string to get name and parameters.
     *
     * @param  string  $pipe
     * @return array
     */
    protected function parsePipeString($pipe)
    {
        [$name, $parameters] = array_pad(explode(':', $pipe, 2), 2, []);

        if (is_string($parameters)) {
            $parameters = explode(',', $parameters);
        }

        return [$name, $parameters];
    }

    /**
     * Get the array of configured pipes.
     *
     * @return array
     */
    protected function pipes()
    {
        return $this->pipes;
    }

    /**
     * Get the container instance.
     *
     * @return \Illuminate\Contracts\Container\Container
     *
     * @throws \RuntimeException
     */
    protected function getContainer()
    {
        if (! $this->container) {
            throw new RuntimeException('A container instance has not been passed to the Pipeline.');
        }

        return $this->container;
    }

    /**
     * Set the container instance.
     *
     * @param  \Illuminate\Contracts\Container\Container  $container
     * @return $this
     */
    public function setContainer(Container $container)
    {
        $this->container = $container;

        return $this;
    }

    /**
     * Handle the value returned from each pipe before passing it to the next.
     *
     * @param  mixed  $carry
     * @return mixed
     */
    protected function handleCarry($carry)
    {
        return $carry;
    }

    /**
     * Handle the given exception.
     *
     * @param  mixed  $passable
     * @param  \Throwable  $e
     * @return mixed
     *
     * @throws \Throwable
     */
    protected function handleException($passable, Throwable $e)
    {
        throw $e;
    }
}

标签:function,Pipeline,return,pipes,reduce,next,pipe,arraya,data
来源: https://www.cnblogs.com/dapianzi/p/16399006.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有