Jonathan Boccara's blog

A Tree of Pipes

Published October 25, 2019 - 0 Comments

Today we have a guest post by Till Heinzel. Till is a physicist-turned-software engineer with a focus on code quality and a passion for C++, particularly the metaprogramming. You can find Till on LinkedIn or on his shiny new blog.

Pipes are pretty neat, don’t you think? They are a great metaphor for what they try to achieve, syntactically simpler than STL algorithms, composable, and avoid several of the issues of ranges. I can definitely see myself using them in my own C++ code in the future.

One thing that pipes still lack, however, is reusability. In a recent series of blog posts, Jonathan presents an approach to allow for the creation of reusable composites of pipes. Here, I would like to share my own approach to implementing pipes, and how it allows reusability in a different way.

Note: Throughout the post, variables that are capital letters (A,B,C,D,…) represent collections. I think it’s easiest to just think of them as std::vector<int> or something similarly well behaved.

Another note: The code here uses C++17 and is optimized for presentation, so it omits some boilerplate, constructors etc, and is profligate with unnecessary copies. In the actual code I have done my best to avoid such issues. You can check it out on github.

Pipes-expressions are trees

The central idea for this implementation is that the expressions we create when using pipes make up a tree structure. Let’s look at an example that illustrates this:

A >>= demux(pipes::filter(...) >>= pipes::push_back(B), 
            pipes::transform(...) >>= pipes::push_back(C));

Going from left to right, we have:

  • A range A, over which we loop and send every bit on to
    • demux, a pipe that sends its input on to
      • filter, which checks some predicate and sends the valid data on to
        • push_back(B), which calls B.push_back with its inputs
      • transform, which applies some function to its inputs and sends the transformed data on to
        • push_back(C), which calls C.push_back with its inputs

This flow is visualized on the graph to the right, which also clearly shows the tree structure, and the different kinds of nodes we have:

  • The “funnel” node, which contains a reference to a range A and a single child. It has no parent, making it the root node of the tree.
  • demux, which has a parent and one or more children
  • filter and transform, which both have a parent and a single child.
  • end nodes, which transfer data out of this pipeline into some collections B and C, and have a parent, but no children. They are thus leaf nodes.

Note: I am not going to go into detail with the root, as it is not required for the reusability of pipes.

Implementing and parsing the tree

To evaluate the expression, each node, except for the root, needs a way to receive data, something to do with said data, and somewhere to send the, possibly modified, data. We can express this in code as:

template<class Op, class… Tails>
class Node{
  Op op; 
  std::tuple<Tails…> tails; 
  // 0 or more Tails. 0 for end nodes, 1 for transform and filter, any number for demux
  template<class T>
  void send(T&& t)
  {
    auto f = [&t](auto... tails){op.send(std::forward<T>(t), tails...);}
    std::apply(f, tails);
  }
}

Here, Op is the thing that knows what to do with the data, and which differentiates pipes. E.g. the transform, demux and push_back Op’s look like:

template<class F>
class Transform{
  F transformation;  
  template<class T, class Tail>
  void send(T&& t, Tail& tail) 
  {
    tail.send(transformation(std::forward<T>(t)));
  }
};

class Demux{
  template<class T, class... Tails>
  void send(const T& t, Tails&... tails) 
  {
    // T must be copyable for demux to work
    (tails.send(t), ...); // fold expressions are neat
  }
};

template<class Pushable>
class PushBack{
  Pushable& pushable;
  template<class T>
  void send(T&& t) 
  {
    pushable.push_back(std::forward<T>(t));
  }
};

We could have implemented this using CRTP as well, but this composition-approach separates the precise implementation of individual pipes from the storage mechanism, and makes it easy to implement reusability. If we add the required typedefs and operators* and ++, we can also allow Node to be used as an output iterator, but that is, again, not necessary to get reusability.

Creating Pipe Nodes

One issue that is not shown here is the creation of pipes. What should the transform(…), filter(…) and demux(…) functions return? Practically, it would make sense if they were Nodes, so we could have operator>>= operate only on Nodes, but Nodes need to know the tail of the pipe (or tailpipe, if you will), which is not known for transform and filter before operator>>= is called. We can resolve this, and have an excellent starting point for reusability, if we add a placeholder for unconnected tails:

struct OpenConnectionPlaceholder{};

and have e.g. the transform-function return a node with an open connection:

template<class F>
auto transform(F f)
{
  return Node<Transform<T>, OpenConnectionPlaceholder>(...);
}

The connection is then “closed” by operator>>= by creating a new Node, which moves the operation and replaces the open tail with the RHS.

template<class Lhs, class Rhs, isNode<Lhs> = true, isNode<Rhs> = true>
auto operator>>=(Lhs lhs, Rhs rhs)
{
  return Node(lhs.op, rhs);
}

where isNode is a SFINAE check as in making SFINAE pretty and robust.

The problem does not arise for multi-child pipes such as demux, because it takes its children as parameters at construction. We focus on the single-child pipes for now and extend the concepts to multi-child pipes later. There is also no problem for endpipes, because they do not have any children at all. So we are now all set to create and connect pipe-nodes.

Open vs. Closed pipes

With OpenConnectionPlaceholder, we need to distinguish between Nodes that have an open connection somewhere, and those that don’t. We want different behaviours based on if a Node is “open” (has any OpenConnectionPlaceholder) or “closed:

  • closed Nodes can be used as output iterators. Open nodes can’t, as data would just be pushed into nothing. That is what dev_null allows done in an explicit manner.
  • closed Nodes can be used as the RHS for operator>>= where the LHS is a root node. This is due to the same issue.
  • Open Nodes are allowed on the LHS of operator>>=, closed ones aren’t.

I am not going to go into too much detail here, but I ended up implementing this as two different kinds of node, Node for open nodes and Output for closed nodes. When the last connection of a Node is closed, it is turned into an Output.

Single child reusability

We almost have reusability of the kind

auto pipe = filter(...) >>= transform(...);
A >>= pipe >>= push_back(B);

but not quite. pipe >>= B would replace the tail of the filter with push_back(B), instead of the tail of the transform.

We can remedy this by recursively looking for an OpenConnectionPlaceholder in the operator:

template<class Lhs, class Rhs>
auto operator>>=(Lhs lhs, Rhs rhs)
{
  if constexpr(hasOpenConnection<Lhs>)
  {
    return Node(lhs.op, rhs);
  }
  else
  {
    return Node(lhs.op, std::get<0>(lhs.tails) >>= rhs);
  }
}

Now the operator rebuilds the tree by finding the open connection and recursively adding the new node this results in.

Note: In reality, this becomes more messy because operator>>= needs to account for quite a few more situations, and also give good error messages when misused.

Multi-child reusability

The OpenConnectionPlaceholder was not required to create multi-child pipes like demux, but we can use it to make those reusable as well. This requires that we add it to the pipes-api in some way. I chose to add it as a simple global constant in the pipes-namespace:

namespace pipes
{
  constexpr auto _ = OpenConnectionPlaceholder{};
}

I think _ is a neat name for a placeholder in expressions, but something more verbose is also a possibility.

This allows creating pipes as

auto pipe = demux(push_back(B), _, _);

To me, it seems as if the least surprising possibility is with operator() on the node: but how can we use them afterwards?

auto pipe = demux(push_back(B), _, _);
auto fullpipe = pipe(push_back(C), push_back(D));

This requires the operator to be implemented in such a way, that takes a number of nodes and plugs them in the place of the open connections. The idea is essentially the same as for the simple reusability using operator>>=: we find an open connection, and create a new node that replaces that connection with the node we passed into the operator. However, now that we are talking about nodes with any number of children, we also need to find the correct OpenConnectionPlaceholder to replace.

The Connect algorithm: first steps

In the simple example above, the algorithm seems pretty clear: iterate over the children of the pipe and if it is an OpenConnectionPlaceholder, replace it with the next of the parameters.

In the actual implementation, the parameters and children are stored as tuples, and we need to use some metaprogramming to implement the algorithm. For the sake of developing the algorithm, let’s pretend they are stored in vectors instead, as that is easier to read. So the first algorithm could look something like this:

for(auto p: parameters){
  for(auto& child: children){
    if(isOpenConnectionPlaceholder(child)) {
      replace(child, p);
      break;
    }
  }
}

Connecting with nested open connections

This works for the simple situation, but it gets more complicated when we consider children with open connections:

auto pipe = demux(_, demux(_, _), _);

Based on the interface, I believe the most sensible ordering isIn this case, we definitely need to fill the nested open connections in, but in what order?

auto pipe = demux(1, demux(2, 3), 4);

so that

pipe(A, B, C, D);

is the same as

auto pipe = demux(A, demux(B,C), D);

We can achieve this by recursively checking the children of the node

for(auto p: parameters)
{
  for(auto& child: children)
  {
    if(isOpenConnectionPlaceholder(child)) 
    {
      replace(child, p);
      break;
    }
    if(hasOpenConnections(child))
    {
      child(p); // recursion step
      break;
    }
  }
}

This is essentially the same approach that we used for operator>>= on single-child nodes, and is a depth-first algorithm: we go along one branch of the tree until we find an open connection, and replace it. It assumes that we can call operator() with a single parameter, even if there are more than one open connections, but there is no reason not to allow that anyway, so we are good.

Connection final iteration: passing open connections as parameters

So far, all that we have passed as parameters have been closed nodes. Let’s see if the algorithm holds up if we can also pass parameters with open connections:

auto pipe = demux(_, _); 
auto pipe2 = pipe(demux(_,_), B);

We would expect this to result in

auto pipe = demux(demux(_, _), B);

Let’s see if that is what we would get. With the depth first algorithm above, we will first plug the new demux into the first open slot of the pipe, so we get:

auto pipe = demux(demux(_, _), _);

However, when we try to plug in B, it will consider this whole new tree to go depth-first in and we end up with:

auto pipe = demux(demux(B, _), _);

which is not correct! We will need a different approach.

I tried different methods, and the algorithm I ended up with works like this:

for(auto& child: children)
{
  auto n = openConnectionCount(child);
  auto paramsToPass = takeFirst(min(n, size(params)), params);
  child(paramsToPass);
  if(params.empty()) break;
}

For each child we

  • 1- figure out how many open connections exist in its subtree.
  • 2- take up to that many nodes from the parameters, removing them from the original list.
  • 3- recursively call the operator() in that subtree with the parameters we took.
  • 4- Once we have placed all parameters, we stop.

The algorithm is essentially still depth-first, but it has an aspect of being breadth-first, as we split the parameters on each level based on the open connections that each child has. Not only does this work with all the above cases, it also is simpler than the previous versions of the algorithm.

Conclusion

We have seen how one can construct pipes as a tree structure, and how creating reusable pieces of pipe corresponds to holes in that structure, which we can fill later. We have also established algorithms for plugging these holes to create complete pipelines using operator>>= for simple pipes like transform and filter, and how we can extend that functionality to be usable for multi-child pipes such as demux through operator().

The ideas presented here are somewhat high-level, and the actual implementation contains a lot of nitty-gritty details necessary to make it all fit together, and you are welcome to have a look at it on github. It is way less readable than Jonathans implementation, so maybe I will clean it up at some point. Any comments or questions are welcome.

Finally, thanks to Jonathan for running a blog with as many useful ideas as fluentcpp, and allowing me to contribute to it.

You will also like

Don't want to miss out ? Follow:   twitterlinkedinrss
Share this post!Facebooktwitterlinkedin