tsort operator

Reading Time: 4 minutes

WebSphere DataStage provides the sort operator, tsort that you can use to sort the records of a data set. The tsort operator can run as either a sequential or a parallel operator.  If the user sets the environment variable APT_DUMP_SCORE, a text representation of the score (a report) is written to the job’s log. The execution mode of the tsort operator determines its action:

Sequential mode: The tsort operator executes on a single processing node to sort an entire data set. On completion, the records of the data set are sorted completely.

Parallel mode: The tsort operator executes on multiple processing nodes in your system. On completion, the records within each partition of a data set are sorted. This type of sort is called a partition sort. Typically, you use a parallel tsort operator as part of a series of operators that requires sorted partitions. For example, you can combine a sort operator with an operator that removes duplicate records from a data set. After the partitions of a data set are sorted, duplicate records in a partition are adjacent.

To perform a parallel sort, you insert a partitioner in front of the tsort operator. This lets you control how the records of a data set are partitioned before the sort. For example, you could hash partition records by a name field, so that all records whose name field begins with the same one-, two-, or three-letter sequence are assigned to the same partition.

The following score example is a fairly small job. When you enable APT_DUMP_SCORE and then run a job, you might typically see in the log the following text:

main_program: This step has 10 datasets:
ds0: {op0[1p] (sequential PacifBaseMCES)
eOther(APT_ModulusPartitioner { key={ value=MBR_SYS_ID }
})<>eCollectAny
op1[4p] (parallel RemDups.IndvIDs_in_Sort)}
ds1: {op1[4p] (parallel RemDups.IndvIDs_in_Sort)
[pp] eSame=>eCollectAny
op2[4p] (parallel RemDups)}
ds2: {op2[4p] (parallel RemDups)
[pp] eSame=>eCollectAny
op6[4p] (parallel buffer(0))}
ds3: {op3[1p] (sequential PacifGalaxyMember)
eOther(APT_ModulusPartitioner { key={ value=MBR_SYS_ID }
})<>eCollectAny
op4[4p] (parallel IndvIdJoin.toIndvIdJoin_Sort)}
ds4: {op4[4p] (parallel IndvIdJoin.toIndvIdJoin_Sort)
eOther(APT_HashPartitioner { key={ value=MBR_SYS_ID }
})#>eCollectAny
op5[4p] (parallel inserted tsort operator {key={value=MBR_SYS_ID, subArgs={asc}}}(0) in IndvIdJoin)}
ds5: {op5[4p] (parallel inserted tsort operator {key={value=MBR_SYS_ID, subArgs={asc}}}(0) in IndvIdJoin)
[pp] eSame=>eCollectAny
op7[4p] (parallel APT_JoinSubOperatorNC in IndvIdJoin)}
ds6: {op6[4p] (parallel buffer(0))
[pp] eSame=>eCollectAny
op7[4p] (parallel APT_JoinSubOperatorNC in IndvIdJoin)}
ds7: {op7[4p] (parallel APT_JoinSubOperatorNC in IndvIdJoin)
[pp] eAny=>eCollectAny
op8[4p] (parallel APT_TransformOperatorImplV22S14_ETLTek_HP37FMember_PMR64262_Test1_SplitTran2 in SplitTran2)}
ds8: {op8[4p] (parallel APT_TransformOperatorImplV22S14_ETLTek_HP37FMember_PMR64262_Test1_SplitTran2 in SplitTran2)
eSame=>eCollectAny
op9[4p] (parallel buffer(1))}
ds9: {op9[4p] (parallel buffer(1))
>>eCollectOther(APT_SortedMergeCollector { key={ value=MBR_SYS_ID,
subArgs={ asc }
}
})
op10[1p] (sequential APT_RealFileExportOperator in HP37_OvaWestmember_extract_dat)}
It has 11 operators:
op0[1p] {(sequential PacifBaseMCES)
on nodes (
node1[op0,p0]
)}
op1[4p] {(parallel RemDups.IndvIDs_in_Sort)
on nodes (
node1[op1,p0]
node2[op1,p1]
node3[op1,p2]
node4[op1,p3]
)}
op2[4p] {(parallel RemDups)
on nodes (
node1[op2,p0]
node2[op2,p1]
node3[op2,p2]
node4[op2,p3]
)}
op3[1p] {(sequential PacifGalaxyMember)
on nodes (
node2[op3,p0]
)}
op4[4p] {(parallel IndvIdJoin.toIndvIdJoin_Sort)
on nodes (
node1[op4,p0]
node2[op4,p1]
node3[op4,p2]
node4[op4,p3]
)}
op5[4p] {(parallel inserted tsort operator {key={value=MBR_SYS_ID, subArgs={asc}}}(0) in IndvIdJoin)
on nodes (
node1[op5,p0]
node2[op5,p1]
node3[op5,p2]
node4[op5,p3]
)}
op6[4p] {(parallel buffer(0))
on nodes (
node1[op6,p0]
node2[op6,p1]
node3[op6,p2]
node4[op6,p3]
)}
op7[4p] {(parallel APT_JoinSubOperatorNC in IndvIdJoin)
on nodes (
node1[op7,p0]
node2[op7,p1]
node3[op7,p2]
node4[op7,p3]
)}
op8[4p] {(parallel APT_TransformOperatorImplV22S14_ETLTek_HP37FMember_PMR64262_Test1_SplitTran2 in SplitTran2)
on nodes (
node1[op8,p0]
node2[op8,p1]
node3[op8,p2]
node4[op8,p3]
)}
op9[4p] {(parallel buffer(1))
on nodes (
node1[op9,p0]
node2[op9,p1]
node3[op9,p2]
node4[op9,p3]
)}
op10[1p] {(sequential APT_RealFileExportOperator in HP37_OvaWestmember_extract_dat)
on nodes (
node2[op10,p0]
)}

It runs 35 processes on 4 nodes.

The dump score report also can tell you when the parallel engine inserted an operator based on its internal analysis of each operator’s requirements. For example, Join stages require that the data be sorted, but the GUI does not require that you supply the sort details. The engine is intelligent enough to realize that a sort is required, and supplies the requirement itself. You can see the behavior in the following example:

op5[4p] {(parallel inserted tsort operator {key={value=MBR_SYS_ID, subArgs={asc}}}(0) in IndvIdJoin)
on nodes (
node1[op5,p0]
node2[op5,p1]
node3[op5,p2]
node4[op5,p3]
)}

In the example, tsort is the name of the sort operator being used. As part of this insertion, the data is repartitioned based on the same key as the hash partitioning:

ds4: {op4[4p] (parallel IndvIdJoin.toIndvIdJoin_Sort)
eOther(APT_HashPartitioner { key={ value=MBR_SYS_ID }
})#>eCollectAny
op5[4p] (parallel inserted tsort operator {key={value=MBR_SYS_ID, subArgs={asc}}}(0) in IndvIdJoin)}

All of this partitioning and sorting provided in the proceeding example is for the Sort stage:

* * *

ds5: {op5[4p] (parallel inserted tsort operator {key={value=MBR_SYS_ID, subArgs={asc}}}(0) in IndvIdJoin)
[pp] eSame=>eCollectAny
op7[4p] (parallel APT_JoinSubOperatorNC in IndvIdJoin)}

[…]

op7[4p] {(parallel APT_JoinSubOperatorNC in IndvIdJoin)
on nodes (
node1[op7,p0]
node2[op7,p1]
node3[op7,p2]
node4[op7,p3]
)}

* * *