Sunday, January 18, 2015

Compile time quicksort in idiomatic modern C++

A contender for the most useless program ever written just got a much needed overhaul. In 2011 I wrote about compile time quick sort as a challenge to myself, and as an exercise in the then newfangled variadic templates. Now, having worked with C++11 for a few years, and lately also C++14, I see that the code I wrote then is unnecessarily clumsy and highly unidiomatic.

In C++14 the data representation is easy. std::integer_sequence<T, ts...> fits perfectly. It really is the same as I used in 2011, but this time it's provided by the standard library.

Output

In order to show sequences, an output streaming operator is handy:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
template <typename T, T ... ts>
std::ostream& operator<<(std::ostream& os,
                         const std::integer_sequence<T, ts...>&)
{
  os << "{ ";
  static const T values[] { ts... };
  std::copy(std::begin(values),
            std::end(values),
            std::ostream_iterator<T>(os, " "));
  return os << "}";
}
Note that the value of the integer sequence is not used. All information is in the type.  That's why filling the values in a static array makes sense - the output stream operator function will be instantiated for each different sequence being output by a program.

Concatenating sequences

An important problem to solve is that of concatenating sequences. It will be needed in quick sort itself, since it concatenates the sorted sub sequences.

First an introduction of the concat template, accompanied by an alias for making life easier.
1
2
3
4
5
template <typename ... T>
struct concat;

template <typename ... T>
using concat_t = typename concat<T...>::type;
The alias concat_t<T...> is a convenience. Instead of having to write using a = typename concat<T...>::type, you just write using a = concat_t<T...>.

In ways I dislike the naming convention (I'd like it reversed,) but it follows that of the C++14 standard library, and that is not unimportant. Regardless, I argue that the latter is not just less typing, but is easier to read too.

Now for the partial specializations.

As so often in template meta programming, the solution is recursive. First concatenating a single integer sequence is the trivial base case.
1
2
3
4
5
template <typename T, T ... ts>
struct concat<std::integer_sequence<T, ts...> >
{
  using type = std::integer_sequence<T, ts...>;
};
In ways this is even unnecessarily complicated. You can say that concatenating one type (any type) is that type itself. That would work, but I choose to make partial specializations for std::integer_sequence<> only, since it makes it slightly easier to catch stupid programming mistakes. It also future proofs concat, should I ever feel a need to concatenate other types.

The general case, concatenating two integer sequences, and some unknown tail is the same as concatenating the combination of the two integer sequences with the tail.
1
2
3
4
5
6
7
template <typename T, T... s1, T... s2, typename ... Tail>
struct concat<std::integer_sequence<T, s1...>,
              std::integer_sequence<T, s2...>,
              Tail...>
{
  using type = concat_t<std::integer_sequence<T, s1..., s2...>, Tail...>;
};
Note how the convenience alias concat_t is useful even in the implementation of concat itself.

Let's see if this works, using a simple program:
1
2
3
4
5
6
7
int main()
{
  using s1 = std::integer_sequence<int, 1, 2, 3>;
  using s2 = std::integer_sequence<int, 4, 5, 6>;
  using s3 = std::integer_sequence<int, 7, 8, 9>;
  std::cout << concat_t<s1, s2, s3>{} << '\n';
}
Note that even though the output stream operator for std::integer_sequence<> doesn't use the value itself, a value is none the less needed, so an instance is created.

The output of running the program is:
{ 1 2 3 4 5 6 7 8 9 }
so that is good.

Partitioning

Quick sort works by partitioning the sequence into a sequence of the values less than a selected pivot element, and the rest, and then recursively sort those partitions. So partitioning a sequence into two sequences based on a predicate is important.

Here too, the solution is a recursive. The base case of partitioning an empty sequence is trivial:
1
2
3
4
5
6
7
8
9
template <template <typename V, V> class pred, typename T>
struct partition;

template <template <typename V, V> class pred, typename T>
struct partition<pred, std::integer_sequence<T>>
{
  using incl_type = std::integer_sequence<T>;
  using excl_type = std::integer_sequence<T>;
};
After all, the number of elements selected by the predicate from the empty sequence are none, and likewise for those rejected by it.

The general case is not much more difficult:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
template <template <typename V, V> class pred, typename T, T t, T ... ts>
struct partition<pred, std::integer_sequence<T, t, ts...>>
{
  using incl = std::integer_sequence<T, t>;
  using excl = std::integer_sequence<T>;

  using tail = partition<pred, std::integer_sequence<T, ts...> >;

  static const bool outcome = pred<T, t>::value;
  using incl_type = concat_t<std::conditional_t<outcome, incl, excl>,
                             typename tail::incl_type>;
  using excl_type = concat_t<std::conditional_t<outcome, excl, incl>,
                             typename tail::excl_type>;
};
As always in C++ template meta programming, using alias names for things is a must to keep the code anywhere near readable.

Above tail is the result of the partitioning of the rest of the values. std::conditional_t<selector, TrueType, FalseType> results in either TrueType or FalseType depending on the boolean value of the selector.

If we assume that the outcome of the predicate on t (the first element in the sequence) is true, then incl_type becomes the concatenation of std::integer_sequence<T, t> and incl_type from tail, otherwise it becomes the concatenation of the empty sequence and incl_type from tail.

Since this implementation builds both the included sequence and the excluded sequence in parallel, the convention of a convenience alias partition_t<> is of little use. It would be possible to make a filter template instead, that just provides a sequence of selected elements, but then filtering would have to be done twice, the second time with the inverted predicate, and even though we're in compile time computation, we care about performance and avoid two-pass implementations if we can, right?

Let's see how this partitioning works, with another simple program:
1
2
3
4
5
6
7
8
9
template <typename T, T v>
struct is_odd : std::integral_constant<bool, v & 1> {};

int main()
{
  using seq = std::integer_sequence<int, 1, 2, 3, 4, 5, 6>;
  using elems = partition<is_odd, seq>;
  std::cout << elems::incl_type{} << ' ' << elems::excl_type{} << '\n';
}
Lines 1-2 defines the simple predicate that tells if an integral value is odd. Line 7 partitions the provided sequence into those selected by the predicate and those rejected by it.

The result of running the program is:
{ 1 3 5 } { 2 4 6 }
so again, it looks OK.

Quick sort

Given the availability of concat and partition, quicksort becomes rather simple to implement. First the typical introduction of the template:
1
2
3
4
5
template <template <typename U, U, U> class compare, typename T>
struct quicksort;

template <template <typename U, U, U> class compare, typename T>
using quicksort_t = typename quicksort<compare, T>::type;
Since quicksort only has one result, the convenience alias quicksort_t becomes handy. compare is a binary predicate accepting two values of a given type.

As so often, the base case is trivial.
1
2
3
4
5
template <template <typename U, U, U> class compare, typename T>
struct quicksort<compare, std::integer_sequence<T>>
{
  using type = std::integer_sequence<T>;
};
After all, the result of sorting an empty sequence is always an empty sequence.

The general case, a partial specialization of quicksort with a binary predicate compare and an std::integer_sequence<> of at least one element, is surprisingly simple too:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
template <template <typename U, U, U> class compare,
          typename T, T t, T ... ts>
struct quicksort<compare, std::integer_sequence<T, t, ts...>>
{
  template <typename V, V v>
  using pred = compare<V, v, t>;

  using partitions = partition<pred, std::integer_sequence<T, ts...>>;

  using incl_seq = typename partitions::incl_type;
  using excl_seq = typename partitions::excl_type;
  using type = concat_t<quicksort_t<compare, incl_seq>,
                        std::integer_sequence<T, t>,
                        quicksort_t<compare, excl_seq> >;
};
Lines 5-6 is an interesting template alias. It makes a unary predicate from the binary predicate compare and the first element of the sequence, t. So, the first element becomes the pivot element to partition the rest of the elements around, and partition requires a unary predicate as its selection criteria. As an example, if compare is "less than", and the first element t is 8, then pred becomes "is less than 8".

Line 8 is where this partitioning is done.

Lines 10-11 are just short hand conveniences to make lines 12-14 readable.

Lines 12-14 are just as from the text book, the result of quick sort is the concatenation of quick sort on the partition of elements included by the predicate, the pivot element, and the result of quick sort on the partition of elements excluded by the predicate.

Another simple program shows the result:

1
2
3
4
5
6
7
8
template <typename T, T lh, T rh>
struct less : std::integral_constant<bool, (lh < rh)> {};

int main()
{
  using seq = std::integer_sequence<int, 32, 8, 4, 1, 7, 3, 99, 101, 5>;
  std::cout << quicksort_t<less, seq>{} << '\n';
}
The output from running this program is unsurprisingly:
{ 1 3 4 5 7 8 32 99 101 }

A word on performance

Everyone who has studied algorithms knows that it's poor practice to partition the elements around the first element for quick sort, since it gives sorting an already sorted sequence O(n²) complexity. For any deterministic element selection, there is a vulnerable family of sequences, so to protect against this, the selection of the pivot element for partitioning should be random. Just two days ago, a solution to this was shown by Matt Bierner in his blog post "Compile Time Pseudo-Random Number Generator".  I leave it as an exercise for someone else to improve on the quick sort implementation.

Wrap up

The readability of this code is vastly improved over the 2011 original, and its shorter too. Using alias templates does not just save typing, but also removes the need for many transformation templates. Performance is also improved over the 2011 original, which used two pass partitioning using a filter.

The utter uselessness of compile time quicksort, however, is not threatened.

Tuesday, January 6, 2015

Sequence Control with the Trompeloeil C++ Mocking Framework


As previously introduced, the Trompeloeil C++ framework is a new mocking framework for C++14. If you're not at all familiar with Trompeloeil, you may want to read the introductory first.


Quick Recap

Before diving in to the topic of sequence control, here comes a brief recapitulation of Trompeloeil.

You need to define a mock class, and you do that as you define any class, but with mock implementations of the interesting functions.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class DataReader
{
public:
  virtual ~DataReader() = default;
  virtual bool available() const = 0;
  virtual size_t appendReadBytes(size_t desiredLength, std::vector<uint8_t>& data) = 0;
};

class ReaderMock : public DataReader
{
public:
  MAKE_CONST_MOCK0(available, bool());
  MAKE_MOCK2(appendReadBytes, size_t(size_t, std::vector<uint8_t>&));
};
Mock classes typically inherit from interfaces but they are not required to, and they typically do not have data members, but they are allowed to. MAKE_MOCKn(name, sig) creates a mock implementation of a member function with n parameters. MAKE_CONST_MOCKn is the same, but for const member functions. The first parameter to both is the name of the member function, and the second is the signature.

In tests, you let your test subject interact with the mock instances, which you place expectations on.

In this post, I will use the term expectation to mean any of REQUIRE_CALL(), ALLOW_CALL(), FORBID_CALL(), NAMED_REQUIRE_CALL(), NAMED_ALLOW_CALL() and NAMED_FORBID_CALL().


Setting up the Scene

The example used in this post is a MessageDispatcher that reads messages from a DataReader and calls observers with the received messages. The observers can subscribe and unsubscribe to receiving messages.
1
2
3
4
5
6
7
8
9
class MessageDispatcher
{
public:
  class subscriber_handle;
  MessageDispatcher(DataReader& reader);
  void readMessage();
  subscriber_handle subscribe(std::function<void(const std::vector<uint8_t>&)>);
  void unsubscribe(subscriber_handle);
};
In this example, messages are constructed as 1 header byte containing the length of the message payload. Subscribers expect only the message payload. For different reasons, it is possible that all of a message is not available for reading, in which case the MessageDispatcher must retry reading the tail when data is available. The MessageDispatcher is expected to continue reading messages as long as DataReader says data is available.

The observers are also made as simple mocks:

1
2
3
4
5
class ObserverMock
{
public:
  MAKE_CONST_MOCK1(message, void(const std::vector<uint8_t>&));
};


Default Sequencing

By default, expectations on different mock objects and different member functions are unrelated and any order permutation is equally good, as long as all expectations are met by the end of their surrounding scope. Several expectations on the same member function for the same mock object are tried in reverse order of creation until a match is found. This is often useful, since you can state a default in a wide scope, and place specific expectations in narrow local scopes.

Here's a first simple test for receiving one message and forwarding it to its observer.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
TEST(a_received_message_is_forwarded_to_a_single_subscriber)
{
  using trompeloeil::_;

  // the players
  ReaderMock        reader;
  MessageDispatcher dispatcher(reader);
  ObserverMock      obs;

  // setting up the scene
  dispatcher.subscribe([&](auto& m) { obs.message(m); });

  const std::vector<uint8_t> msg{'c', 'd', 'e'};

  REQUIRE_CALL(reader, appendReadBytes(1U, _))
    .WITH(_2.empty())
    .LR_SIDE_EFFECT(_2.push_back(msg.size()))
    .RETURN(1U);
  REQUIRE_CALL(reader, appendReadBytes(msg.size(), _))
    .WITH(_2.empty())
    .LR_SIDE_EFFECT(_2 = msg)
    .LR_RETURN(msg.size());
  REQUIRE_CALL(reader, available())
    .RETURN(false);
  REQUIRE_CALL(obs, message(_))
    .LR_WITH(_1 == msg);

  // action!
  dispatcher.readMessage();
}
In this example, there is little sequence control. There are two expectations on reader.appendReadBytes() on lines 15 and 19,  meaning that when a call arrives, the one on line 19 is tried first and if it does not match the parameters, the one on line 15 is tried.

A reasonable implementation of MessageDispatcher will pass, but so will a number of faulty ones too. What we want is to ensure that the MessageDispatcher reads the length (the expectation on line 15), and then continues with reading the payload, and finishes with checking if more data is available. The expectation on the observer is not really a problem since the implementation would be unlikely to call it with the correct message unless it has read it properly. The order between the call to the observer and reader.available() is irrelevant.


Constrained Sequences

A fixed version with constraints on the matching sequence of expectations could look like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
TEST(a_received_message_is_forwarded_to_a_single_subscriber)
{
  using trompeloeil::_;

  // the players
  ReaderMock        reader;
  MessageDispatcher dispatcher(reader);
  ObserverMock      obs;

  // setting up the scene
  dispatcher.subscribe([&](auto& m) { obs.message(m); });

  const std::vector<uint8_t> msg{'c', 'd', 'e'};

  trompeloeil::sequence seq;
  REQUIRE_CALL(reader, appendReadBytes(1U, _))
    .WITH(_2.empty())
    .IN_SEQUENCE(seq)
    .LR_SIDE_EFFECT(_2.push_back(msg.size()))
    .RETURN(1U);
  REQUIRE_CALL(reader, appendReadBytes(msg.size(), _))
    .WITH(_2.empty())
    .IN_SEQUENCE(seq)
    .LR_SIDE_EFFECT(_2 = msg)
    .LR_RETURN(msg.size());
  REQUIRE_CALL(reader, available())
    .IN_SEQUENCE(seq)
    .RETURN(false);
  REQUIRE_CALL(obs, message(_))
    .LR_WITH(_1 == msg);

  // action!
  dispatcher.readMessage();
}
The sequence object created on line 15, is used to impose a strict order on the matched expectations (lines 18, 23 and 27.) Now only the expectation for obs.message() on line 29 is unconstrained, the other three must occur in the stated order, or a violation is reported.


Interleaved Sequences

Let's up the ante and verify that several messages can be read in one go, and that an observer can unsubscribe in the message member function. This is considerably longer, but despair not, an explanation follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
TEST(an_observer_can_unsubscribe_in_the_callback_mid_message_processing)
{
  using trompeloeil::_;

  // the players
  ReaderMock        reader;
  MessageDispatcher dispatcher(reader);
  ObserverMock      obs1, obs2, obs3;

  // setting up the scene
  dispatcher.subscribe([&](auto &m){ obs1.message(m); });
  auto i2 = dispatcher.subscribe([&](auto &m){ obs2.message(m);});
  dispatcher.subscribe([&](auto &m){ obs3.message(m); });

  trompeloeil::sequence read_seq, obs1_seq, obs2_seq, obs3_seq;

  // first message
  const std::vector<uint8_t> msg1{'c', 'd', 'e'};

  REQUIRE_CALL(reader, appendReadBytes(1U, _))
    .WITH(_2.empty())
    .IN_SEQUENCE(read_seq)
    .LR_SIDE_EFFECT(_2.push_back(msg1.size()))
    .RETURN(1U);
  REQUIRE_CALL(reader, appendReadBytes(msg1.size(), _))
    .WITH(_2.empty())
    .IN_SEQUENCE(read_seq, obs1_seq, obs2_seq, obs3_seq)
    .LR_SIDE_EFFECT(_2 = msg1)
    .LR_RETURN(msg1.size());
  REQUIRE_CALL(obs1, message(_))
    .LR_WITH(_1 == msg1)
    .IN_SEQUENCE(obs1_seq);
  REQUIRE_CALL(obs2, message(_))
    .LR_WITH(_1 == msg1)
    .IN_SEQUENCE(obs2_seq)
    .LR_SIDE_EFFECT(dispatcher.unsubscribe(i2));
  REQUIRE_CALL(obs3, message(_))
    .LR_WITH(_1 == msg1)
    .IN_SEQUENCE(obs3_seq);
  REQUIRE_CALL(reader, available())
    .IN_SEQUENCE(read_seq, obs1_seq, obs2_seq, obs3_seq)
    .RETURN(true); // continue reading messages

  // second message
  const std::vector<uint8_t> msg2{'f'};

  REQUIRE_CALL(reader, appendReadBytes(1U, _))
    .WITH(_2.empty())
    .IN_SEQUENCE(read_seq)
    .LR_SIDE_EFFECT(_2.push_back(msg2.size()))
    .RETURN(1U);
  REQUIRE_CALL(reader, appendReadBytes(msg2.size(), _))
    .WITH(_2.empty())
    .IN_SEQUENCE(read_seq, obs1_seq, obs2_seq, obs3_seq)
    .LR_SIDE_EFFECT(_2 = msg2)
    .LR_RETURN(msg2.size());
  REQUIRE_CALL(obs1, message(_))
    .LR_WITH(_1 == msg2)
    .IN_SEQUENCE(obs1_seq);
  REQUIRE_CALL(obs3, message(_))
    .LR_WITH(_1 == msg2)
    .IN_SEQUENCE(obs3_seq);
  REQUIRE_CALL(reader, available())
    .IN_SEQUENCE(read_seq, obs1_seq, obs2_seq, obs3_seq)
    .RETURN(false); // no more data

  // action!
  dispatcher.readMessage();
}
The first interesting thing is the many sequence objects defined on line 15. While we still don't want to impose any specific order between the different subscribers, we want to ensure that the two messages are delivered to the observers in order, and immediately after having been read.

Lines 30-39 say that each observer is expected to be called, but since they have different sequence objects, any order between them is OK. But, already the expectation for appendReadBytes() also refers to each of the sequence objects obs1_seq, obs2_seq and obs3_seq, meaning that the observers may not be called until after the message has been read.

Line 41 says that reader.available() must be called after the messages have been read, since it refers to read_seq, which also the expectations on appendReadBytes() on lines 21-21 do. Line 41 also refers to each of obs1_seq, obs2_seq and obs3_seq, meaning that although there is no restriction on which observer gets called in which order, all of them must have been called before reader.available() is called.

The second section on lines 44-65 is more or less the same, but since obs2 unsubscribes on line 36, the only remaining references to obs3_seq are for reads.

A graphic representation of allowed sequences may be viewed as follows:


Wrap up

By default calls to unrelated mock objects and unrelated member functions on a mock object have no restrictions on the order of matched calls, while expectations to the same member function of the same mock object are tried in reverse order of creation, until a match is found.

Sequence objects is a powerful tool for both ensuring that calls to unrelated objects happens in a required sequence, and for imposing a more readable sequencing of calls matching several expectations to the same member function on the same mock object.

Using several sequence you can describe interleaved sequences that may, but need not, have common points. This allows you to restrict the test exactly as much as necessary, but also allows you to relax the test as much as desired, to allow for flexibility in implementation (for example, you typically don't want to impose an order of objects visited from a hash-table.)