Hi,

("A", 1, ...)

("A", 3, ...)

("B", 6, ...)

I am trying to aggregate a key based on some timestamp, and I believe that spilling to disk is changing the order of the data fed into the combiner.

I have some timeseries data that is of the form: ("key", "date", "other data")

Partition 1

("A", 2, ...)

("B", 4, ...)("A", 1, ...)

("A", 3, ...)

("B", 6, ...)

which I then partition by key, then sort within the partition:

Partition 1

("A", 1, ...)

("A", 2, ...)

("A", 3, ...)

("A", 4, ...) Partition 2

("B", 4, ...)

("B", 6, ...)

If I run a combineByKey with the same partitioner, then the items for each key will be fed into the ExternalAppendOnlyMap in the correct order. However, if I spill, then the time slices are spilled to disk as multiple partial combiners. When its time to merge the spilled combiners for each key, the combiners are combined in the wrong order.

For example, if during a groupByKey, [("A", 1, ...), ("A", 2...)] and [("A", 3, ...), ("A", 4, ...)] are spilled separately, it's possible that the combiners can be combined in the wrong order, like [("A", 3, ...), ("A", 4, ...), ("A", 1, ...), ("A", 2, ...)], which invalidates the invariant that all the values for A are passed in order to the combiners.

I'm not an expert, but I suspect that this is because we use a heap ordered by key when iterating, which doesn't retain the order the spilled combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index), where spill_index is incremented each time we spill? This would mean that we would pop and merge the combiners of each key in order, resulting in [("A", 1, ...), ("A", 2, ...), ("A", 3, ...), ("A", 4, ...)].

Thanks in advance for the help! If there is a way to do this already in Spark 1.2, can someone point it out to me?

Best,

Justin