Jonathan Boccara's blog

Piping To and From a Stream

Published October 18, 2019 - 0 Comments

So far, the pipes library is able to read from a collection and output to one or several collections.

For example, to multiply by 2 the numbers greater than 3 from a collection inputs and output the results in the collections outputs1 and outputs2, we can write:

auto const inputs = std::vector<int>{1, 2, 3, 4, 5};
auto outputs1 = std::vector<int>{};
auto outputs2 = std::vector<int>{};

inputs >>= pipes::filter([](int i){ return i > 3; })
       >>= pipes::transform([](int i){ return i * 2; })
       >>= pipes::demux(pipes::push_back(outputs1),
                        pipes::push_back(outputs2));

The result of this program is that outputs1 and outputs2 contain 8 and 10.

What if we want to read from an input stream and/or write to an output stream?

An input stream can be the standard input (std::cin), reading from a file (std::ifstream) or reading from a string (std::istringstream).

An output stream can be the standard output (std::cout), writing to a file (std::ofstream) or writing to string (std::ostringstream).

Let’s make the pipes write to a stream and read from a stream.

Writing to a stream

The standard library offers an output iterator to write to a stream: std::ostream_iterator. std::ostream_iterator receives data and sends it to a stream. This is pretty close to what we want to achieve. Let’s analyse std::ostream_iterator in more details.

First, here is a basic usage of std::ostream_iterator:

auto const inputs = std::vector<int>{1, 2, 3, 4, 5};

std::transform(begin(inputs), end(inputs),
               std::ostream_iterator<int>(std::cout),
               [](int i){ return i * 2; });

This program writes to the standard output:

246810

Note that std::ostream_iterator requires a template argument representing the type of values it expects to receive. Here, int.

In this particular use case, specifying the template argument seems superfluous. Indeed, when we pass something to std::cout we don’t specify its type:

std::cout << i;

We could expect std::ostream_iterator not to require a template parameter.

Let’s try to design a pipe that does the same thing as std::ostream_iterator but that doesn’t require to specify the type of value it receives.

Implementing to_out_stream

Here is the implementation of to_out_stream. We will go through it line by line just after:

template<typename OutStream>
class to_out_stream_pipeline : public pipeline_base<to_out_stream_pipeline<OutStream>>
{
public:
    template<typename T>
    void onReceive(T&& value)
    {
        outStream_.get() << FWD(value);
    }
    
    explicit to_out_stream_pipeline(OutStream& outStream) : outStream_(outStream) {}

private:
    std::reference_wrapper<OutStream> outStream_;
};

template<typename OutStream>
to_out_stream_pipeline<OutStream> to_out_stream(OutStream& outStream)
{
    return to_out_stream_pipeline<OutStream>(outStream);
}

Here is an explanation of this code:

template<typename OutStream>

The class accepts any type of output stream (as long as it supports the operations we call on in further down in the class’s implementation).

class to_out_stream_pipeline : public pipeline_base<to_out_stream_pipeline<OutStream>>

In the language of the internals of the pipes library, a pipe is something that plugs itself onto the left of a pipeline. The component we’re designing is the terminal part of a pipeline and can receive data, so it is itself a pipeline and not a pipe.

To be integrated with the rest of the library and benefit from its features (compatibility with operator>>=, with STL algorithms, …) we inherit from the CRTP base class pipeline_base.

public:
    template<typename T>
    void onReceive(T&& value)
    {
        outStream_.get() << FWD(value);
    }

pipeline_base expects an onReceive member function that it calls when the component is sent a value. In our case, we want to send that value to the output stream.

FWD is a macro that expands to std::forward with the right type, and avoids burdening the code with reference treatments.

explicit to_out_stream_pipeline(OutStream& outStream) : outStream_(outStream) {}

We construct the pipeline with an output stream.

private:
    std::reference_wrapper<OutStream> outStream_;
};

We want to store a reference of the output stream. But we also want the pipeline to support operator= (indeed, pipes can be used as output iterators and the debug mode of the STL of MSVC can call operator= on an output iterator on itself). So we store is as a std::reference_wrapper.

template<typename OutStream>
to_out_stream_pipeline<OutStream> to_out_stream(OutStream& outStream)
{
    return to_out_stream_pipeline<OutStream>(outStream);
}

Finally, we make a helper function to avoid the user to pass the template parameter representing the type of the stream. This could be avoided in C++17 template types deduction is constructors, but the library is compatible with C++14.

Usage

Back to our initial example, we can now output data to a stream:

auto const inputs = std::vector<int>{1, 2, 3, 4, 5};
auto outputs1 = std::vector<int>{};

inputs >>= pipes::filter([](int i){ return i > 3; })
       >>= pipes::transform([](int i){ return i * 2; })
       >>= pipes::demux(pipes::push_back(outputs1),
                        pipes::to_out_stream(std::cout));

Note that, unlike std::ostream_iterator, we don’t need to specify the type of data to pass to the stream. The type is resolved by the template member function of the component.

Reading from an input stream

To read from an input stream, the STL offers std::istream_iterator. We already saw a detailed presentation of this iterator in How to split a string in C++, you can refer to it for a detailed presentation of its design.

A basic usage of std::istream_iterator is this:

auto values = std::vector<int>{};

std::copy(std::istream_iterator<int>{std::cin}, std::istream_iterator<int>{},
          std::back_inserter(values));

This code reads integers coming from the standard input, and stores them in the values collection.

If this code doesn’t look crystal clear to you, check out the beginning of How to split a string in C++.

Note that like std::ostream_iterator, std::istream_iterator expects a template type indicating the type of data it reads. But in this case this is legitimate: a stream contains raw data, so you need to decide what type you will consider them.

This is also why we need to specify a type when we use std::cin:

auto i = int{};
std::cin >> i;

The interface to pipe from a stream

To make the pipes library able to read inputs from a stream, there are several possible interfaces.

For example, we can pass the stream to a special pipe:

auto const inputs = std::vector<int>{1, 2, 3, 4, 5};
auto outputs1 = std::vector<int>{};
auto outputs2 = std::vector<int>{};

std::cin >>= pipes::read_in_stream<int>
         >>= pipes::filter([](int i){ return i > 3; })
         >>= pipes::transform([](int i){ return i * 2; })
         >>= pipes::demux(pipes::push_back(outputs1),
                          pipes::push_back(outputs2));

Or we can wrap the stream:

auto const inputs = std::vector<int>{1, 2, 3, 4, 5};
auto outputs1 = std::vector<int>{};
auto outputs2 = std::vector<int>{};

pipes::read<int>(std::cin)
         >>= pipes::filter([](int i){ return i > 3; })
         >>= pipes::transform([](int i){ return i * 2; })
         >>= pipes::demux(pipes::push_back(outputs1),
                          pipes::push_back(outputs2));

Which interface do you prefer, the first one or the second one? Please leave a comment below.

For the moment, let’s implement the first one. The implementation of the second one should be very close anyway.

Implementing read_in_stream

The implementation of read_in_stream can be done it two steps.

First, we’ll implement the read_in_stream type and the operator>>= to associate it with the pipeline on its right (operator>>= is right associative). Then we’ll implement the operator>>= that takes the input stream on the left hand side.

Let’s start by creating read_in_stream and the operator>>= on its right:

template<typename Value, typename Pipeline>
struct read_in_stream_pipeline
{
    Pipeline pipeline_;
    explicit read_in_stream_pipeline(Pipeline& pipeline) : pipeline_(pipeline){}
};

template<typename Value>
struct read_in_stream {};

template<typename Value, typename Pipeline>
auto operator>>= (read_in_stream<Value> readInStreamPipe, Pipeline&& pipeline)
{
    return read_in_stream_pipeline<Value, std::decay_t<Pipeline>>{pipeline};
}

Nothing really complex here: read_in_stream, when associated with a pipeline, returns a read_in_stream_pipeline that contains that pipeline.

Now we can implement the operator>>= that takes the input stream:

template<typename InStream, typename Value, typename Pipeline>
void operator>>= (InStream&& inStream, read_in_stream_pipeline<Value, Pipeline> readInStreamPipe)
{
    for (auto inValue = std::istream_iterator<Value>{inStream}; inValue != std::istream_iterator<Value>{}; ++inValue)
    {
        pipes::send(*inValue, readInStreamPipe.pipeline_);
    }
}

With std::istream_iterator under our belt, this operator is easy to implement.

Pipes and streams

Making the pipes library compatible with streams was a good refresher on how streams and stream iterators work in C++.

The implementation to read from and write to a stream was not very complex, which is a good sign for the design of the library.

A more important question is the interface. Do you prefer the first interface or the second one to read from a stream? Leave a comment below to let me know.

You will also like

Don't want to miss out ? Follow:   twitterlinkedinrss
Share this post!Facebooktwitterlinkedin