Jonathan Boccara's blog

Composite Pipes, part 1: Decoupling Operators From Classes

Published September 17, 2019 - 0 Comments

One of the things that one would expect C++ pipes to do, and that they couldn’t do until recently, is creating composite reusable pipes.

Indeed, we could chain several pipes into a complete pipeline:

input >>= pipes::filter([](int i) { return i % 2 == 0; })
      >>= pipes::transform([](int i ){ return i * 2; })
      >>= back_inserter(results);

But we couldn’t create a partial pipeline to use it later:

auto compositePipe = pipes::filter([](int i) { return i % 2 == 0; })
                 >>= pipes::transform([](int i ){ return i * 2;}); // doesn't compile!

input >>= compositePipe >>= back_inserter(results);

However, this is a feature that would be natural for the library.

To implement this feature, we need to clarify the internal interfaces of the library and decouple operator>>= from the classes it works with.

This was an instructive mini-project, which I think is worth sharing.

The previous architecture

Here is a brief description of the previous architecture, to clarify our starting point.

The various pipes are represented by classes that contain their transformation plus the rest of the pipeline. For example, the implementation of the transform pipe looks like this:

template<typename Function, typename NextPipe>
class transform_pipe : public OutputIteratorBase<transform_pipe<Function, NextPipe>>
{
public:
    template<typename T>
    void onReceive(T&& input)
    {
        send(nextPipe_, function_(input));
    }

    explicit transform_pipe(Function function, NextPipe nextPipe) : function_(function), nextPipe_(nextPipe) {}
    
private:
    Function function_;
    NextPipe nextPipe_;
};

The OutputIteratorBase CRTP base class ensures the compatibility with STL algorithms, and calls onReceive when a piece of data is sent to the pipe with the send function. Then  transform_pipe sends new uses the send function to send the transformed data to the next step of the pipeline.

Let’s now have a look at the implementation of operator>>=:

template<typename Function, typename NextPipe>
transform_pipe<Function, NextPipe> operator>>= (TransformFunctionWrapper<Function> const& transformFunctionWrapper, NextPipe const& nextPipe)
{
    return transform_pipe<Function, NextPipe>{transformFunctionWrapper.function, nextPipe};
}

This introduces a new class, TransformFunctionWrapper. That’s the result of calling transform with a function. TransformFunctionWrapper merely stores that function, so that operator>>= can access it:

template<typename Function>
struct TransformFunctionWrapper{ Function function; };

template<typename Function>
TransformFunctionWrapper<Function> transform(Function&& function)
{
    return TransformFunctionWrapper<Function>{function};
}

Why this design doesn’t work for composite pipes

This code allows to write a pipeline including all the elements to the end:

    pipes::transform([](int i){ return i * 2; })
>>= back_inserter(results);

This returns a transform_pipe.

We can also chain another pipe with this transform_pipe:

    pipes::filter([](int i) { return i % 2 == 0; })
>>= pipes::transform([](int i ){ return i * 2; })
>>= back_inserter(results);

This creates a filter_pipe, that is designed the same spirit as transform_pipe, but with a NextPipe being the transform_pipe.

But if we write this expression, it doesn’t compile:

    pipes::filter([](int i) { return i % 2 == 0; })
>>= pipes::transform([](int i ){ return i * 2; });

Indeed, this tries to create a filter_pipe with a TransformFunctionWrapper as a next pipe. And the TransformFunctionWrapper is just a wrapper for a function. Calling send on such an object makes no sense and doesn’t compile.

The target design

Our target design is to be able to declare what we want the operator>>= to do, depending on the type of argument we pass to it:

  • 1) range >>= pipeline: iterate on the range and send each element to the pipeline
  • 2) pipe >>= pipeline: tack the pipe on to the pipeline and return the resulting pipeline
  • 3) pipe >>= pipe: create a composite pipe (which is still a pipe)

To do this, we need to clarify what we call a range, a pipe and a pipeline, and to centralise the code of operator>>=.

We’ve done 1) in a previous post. We’re going to do 2) and 3) now. We’ll do 3) in the next post.

What’s a pipe? And what’s a pipeline?

Let’s use the following definitions for a pipe and for a pipeline:

  • a pipeline is something you can send data to, with the send function,
  • a pipe is something that can be combined with a pipeline to form a new pipeline.

Defining a pipeline

If a pipeline is something send can send data to, we have example of pipelines in the library: for example transform_pipe and filter_pipe.

But they’re called “pipes”, not “pipelines”! Let’s change their names to better reflect that. They’re now called transform_pipeline and filter_pipeline. And the same goes for the other components of the library: demux_pipeline, unzip_pipeline, and so on.

Let’s have a look again at transform_pipeline:

template<typename Function, typename NextPipe>
class transform_pipe : public OutputIteratorBase<transform_pipe<Function, NextPipe>>
{
public:
    template<typename T>
    void onReceive(T&& input)
    {
        send(nextPipe_, function_(input));
    }

    explicit transform_pipe(Function function, NextPipe nextPipe) : function_(function), nextPipe_(nextPipe) {}
    
private:
    Function function_;
    NextPipe nextPipe_;
};

There is another strange name, on line 8: nextPipe_. If we’re sending data to it, then by our definition it is a pipeline, not a pipe. What’s that pipeline? It’s the rest of the pipeline, after the transform step. Let’s call that tailPipeline_.

Now let’s look on line 2: OutputIteratorBase. This is the class that allows transform_pipeline to receive data via the send function (and via STL algorithms) and calls onReceive. That allows transform_pipeline to be a pipeline. So let’s then rename this class pipeline_base.

Here is the code with the updated names. It should make more sense now:

template<typename Function, typename TailPipeline>
class transform_pipeline : public pipeline_base<transform_pipeline<Function, TailPipeline>>
{
public:
    template<typename T>
    void onReceive(T&& input)
    {
        send(tailPipeline_, function_(input));
    }

    explicit transform_pipeline(Function function, TailPipeline tailPipeline) : function_(function), tailPipeline_(tailPipeline) {}
    
private:
    Function function_;
    TailPipeline tailPipeline_;
};

We’re going to formally define the concept of a pipeline. A pipeline inherits from the template class pipeline_base, by passing itself as a template parameter.

In C++20 we’ll be able to use C++ concepts, but the library is compatible with C++14 so we will emulate concepts with the C++ detection idiom. We will use the SFINAE trick using a bool explained in How to Make SFINAE Pretty and Robust:

template<typename Pipeline>
using IsAPipeline = std::enable_if_t<std::is_base_of<pipeline_base<Pipeline>, Pipeline>::value, bool>;

Defining a pipe

As we defined earlier, a pipe is something we can associate to a pipeline in order to make a new pipeline.

For example, the function transform returns a pipe. What was returned by transform is a TransformFunctionWrapper, which doesn’t have a lot of meaning.

Let’s give is a more appropriate name now that we now that this is a pipe:

template<typename Function>
struct transform_pipe
{
    Function function_;
};

Since we need to associate a pipe with a pipeline to create a new pipeline, we’re going to define a (compile-time) interface for a pipe: a pipe is a type that has a member function called plug_to_pipeline that adds a transforming step to an incoming pipeline and returns the resulting pipeline.

For the transform pipe, that would be:

template<typename Function>
class transform_pipe
{
public:
    template<typename Pipeline>
    auto plug_to_pipeline(Pipeline&& pipeline) const
    {
        return transform_pipeline<Function, std::remove_reference_t<Pipeline>>{function_, pipeline};
    }
    
    explicit transform_pipe(Function function) : function_(function){}

private:
    Function function_;
};

We use std::remove_reference_t in case the function receives a lvalue, in which case Pipeline is a reference type.

And to define a pipe, we can use this definition:

struct aPipeline : pipeline_base<aPipeline>{};
template<typename Pipe>
using pipe_expression = decltype(std::declval<Pipe&>().plug_to_pipeline(std::declval<aPipeline&>()));

template<typename Pipe>
constexpr bool pipe_expression_detected = detail::is_detected<pipe_expression, Pipe>;

template<typename Pipe>
using IsAPipe = std::enable_if_t<pipe_expression_detected<Pipe>, bool>;

Decoupling operator>>= from the classes

Now that we defined pipes and pipeline, we can replace all the individual operator>>= of the various components with this unique one:

template<typename Pipe, typename Pipeline, IsAPipe<Pipe>, IsAPipeline<Pipeline>>
auto operator>>=(Pipe&& pipe, Pipeline&& pipeline)
{
    return pipe.plug_to_pipeline(pipeline);
}

A consequence of that is that operator>>= and pipes no longer work with std::back_inserter. Indeed, std::back_inserter doesn’t derive from pipeline_base, and therefore is not a pipeline by our definition.

We therefore need to introduce a push_back pipeline that does the same thing as std::back_inserter except that it is a pipeline by our definition:

input >>= pipes::filter([](int i) { return i % 2 == 0; })
      >>= pipes::transform([](int i ){ return i * 2; })
      >>= pipes::push_back(results);

But specifically using std::back_inserter does not bring any benefit anyway.

The same goes with the begin iterators of containers, that we replace by a new pipes::begin.

What we did

All this was mainly refactoring, which means that we haven’t added any new features. We only reorganised the code.

In particular:

  • we improved the naming of our classes and clarified their responsibilities,
  • we introduced interfaces with concepts, emulated with the detection idiom.

Is the design now definitive? Probably not. We’ll discover new improvements to do as we go along. If you see something you’d like to see improved, please let me know by dropping comment or submit a PR on the Github repository.

But this refactoring allowed us to centralise the code of operator>>=, which is necessary to implement the next feature we saw at the beginning of this article: composite pipes!

This is what we do in the next post. Stay tuned!

You will also like

Don't want to miss out ? Follow:   twitterlinkedinrss
Share this post!Facebooktwitterlinkedin