You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have searched in the issues and found no similar issues.
Describe the proposal
The block id encodes the task attempt id, the partition id, and the sequence number of a shuffle block. Block data are stored along with this block id, while some of these information are explicitly stored as well (task attempt id). For reading, interfaces use block id, partition id encoded in block id, and explicit task attempt ids to identify relevant blocks. Reading blocks explicitly by their block ids implicitly reads particular partition id, task attempt id and sequence number tuples.
Further, the block id is 63 bits long, while task attempt id and partition id are 31 bits long. This poses a conflict as bits have to be balanced between task attempt ids, partition ids and sequence number. There are potential situations where one of these bits are insufficient, which kills an application using the shuffle service.
The block id should be refactored to simplify interfaces, to make reading blocks explicit, and to avoid potential situations where bits are insufficient.
The block id is effectively a triple of (task attempt id, partition id, sequence number), truncated and encoded in a long.
Given each block id is stored together with its task attempt id, we can make block id a triple of (task index, partition id, sequence number). Blocks a read giving the set of task attempt ids that wrote the desired map index, we can reduce the bit foot print of the block id.
The structure of the block id should then be made explicit:
BlockId(int mapIndex, int partitionId, int sequenceNo)
The block id is often used in large sets, e.g.
the set of all block ids written by a task attempt (register shuffle result)
the set of all block ids that constitute a partition (reading blocks)
A task attempt can register blocks using a Map<Integer, Integer>, which maps a partition id to the number of blocks (#1399). This should also reduce the memory footprint storing registered block ids on the shuffle server (or the Spark driver #1538).
Reading blocks should be as easy as reading blocks written by a set of task attempt ids that produced block ids that match Map<Integer, Map<Integer, Integer>, which maps a partition id to the task index and their number of blocks. Given map index and partition index are ranges, we can store these information efficiently as arrays blockNums[partitionRangeIndex][mapRangeIndex].
While reading those blocks, the reader has to maintain a set of block sequence numbers from 0 to blockNum - 1 (#1399) for each partition id and task index to identify duplicates. This can be done via Map<Integer, Roaring64NavigableMap>, where the Roaring64NavigableMap stores read (map index, sequence number) tuples.
The BufferSegment stores the block id (long) and already the task attempt id (long). Reworking the block id will increases the size of BufferSegment only by one 32 bits (block id long turns into three ints).
Primary objective:
Any possible task attempt id, partition id and sequence number is supported.
Benefits:
the block id field information are not truncated any more
concept of block id layout and its configuration can be removed
block id field information are accessible server and client side
id helper providing access to the partition id can be removed
storing shuffle result registration server side does not require a number of bitmaps argument
reduce redundancies in BufferSegment
Task list
Register shuffle result using Map<partitionId, blockNums>
Code of Conduct
Search before asking
Describe the proposal
The block id encodes the task attempt id, the partition id, and the sequence number of a shuffle block. Block data are stored along with this block id, while some of these information are explicitly stored as well (task attempt id). For reading, interfaces use block id, partition id encoded in block id, and explicit task attempt ids to identify relevant blocks. Reading blocks explicitly by their block ids implicitly reads particular partition id, task attempt id and sequence number tuples.
Further, the block id is 63 bits long, while task attempt id and partition id are 31 bits long. This poses a conflict as bits have to be balanced between task attempt ids, partition ids and sequence number. There are potential situations where one of these bits are insufficient, which kills an application using the shuffle service.
The block id should be refactored to simplify interfaces, to make reading blocks explicit, and to avoid potential situations where bits are insufficient.
The block id is effectively a triple of (task attempt id, partition id, sequence number), truncated and encoded in a
long
.Given each block id is stored together with its task attempt id, we can make block id a triple of (task index, partition id, sequence number). Blocks a read giving the set of task attempt ids that wrote the desired map index, we can reduce the bit foot print of the block id.
The structure of the block id should then be made explicit:
The block id is often used in large sets, e.g.
A task attempt can register blocks using a
Map<Integer, Integer>
, which maps a partition id to the number of blocks (#1399). This should also reduce the memory footprint storing registered block ids on the shuffle server (or the Spark driver #1538).Reading blocks should be as easy as reading blocks written by a set of task attempt ids that produced block ids that match
Map<Integer, Map<Integer, Integer>
, which maps a partition id to the task index and their number of blocks. Given map index and partition index are ranges, we can store these information efficiently as arraysblockNums[partitionRangeIndex][mapRangeIndex]
.While reading those blocks, the reader has to maintain a set of block sequence numbers from
0
toblockNum - 1
(#1399) for each partition id and task index to identify duplicates. This can be done viaMap<Integer, Roaring64NavigableMap>
, where theRoaring64NavigableMap
stores read (map index, sequence number) tuples.The
BufferSegment
stores the block id (long
) and already the task attempt id (long
). Reworking the block id will increases the size ofBufferSegment
only by one 32 bits (block id long turns into three ints).Primary objective:
Benefits:
BufferSegment
Task list
Map<partitionId, blockNums>
taskAttemptId
withmapIndex
in block idBlockId
instead oflong
everywhereBlockId(long blockId)
intoBlockId(int mapIndex, int partitionId, int sequenceNo)
Are you willing to submit PR?
The text was updated successfully, but these errors were encountered: