[vsipl++] [patch] HPEC CFAR Detection benchmark
Jules Bergmann
jules at codesourcery.com
Tue May 2 18:29:15 UTC 2006
Don McCoy wrote:
> The attached patch implements the CFAR benchmark. Briefly, this problem
> involves finding targets based on data within a three-dimensional cube
> of 'beam locations', 'range gates' and 'doppler bins'. It does this by
> comparing the signal in a given cell to that of nearby cells in order to
> avoid false-detection of targets. The range gate parameter is varied
> when considering 'nearby' cells. A certain number of guard cells are
> skipped, resulting in a computation that sums the values from two thick
> slices of this data cube (one on either side of the slice for a
> particular range gate). The HPEC PCA Kernel-Level benchmark paper has a
> diagram that shows one cell under consideration. Please refer to it if
> needed.
>
> The algorithm involves these basic steps:
> - compute the squares of all the values in the data cube
> - for each range gate:
> - sum the squares of desired values around the current range gate
> - compute the normalized power for each cell in the slice
> - search for values that exceed a certain threshold
>
> Some of the code relates to boundary conditions (near either end of the
> 'range gates' parameter), but otherwise it follows the above description.
Don,
Excellent description of the benchmark! Can you put it into the file
header as a comment?
>
> For now, the original implementation used get/put (actually operator())
> instead of using subviews and the element-wise operators. Switching
> from one to the other resulted in about a 25% improvement in performance
> for the first set of data (see attached graph). The other sets
> experienced improvement as well, to varying degrees. I'd like to
> consider how we can improve the throughput further. Switching the
> processing order may help possibly. Thoughts are welcome.
General comments:
- Avoid memory allocation/deallocation inside the compute loop.
For example, the 't1' temporary matrix in cfar_find_targets()
is being allocated/deallocated multiple times during a single
test_cfar() call.
You could avoid this by moving cfar_find_targets() and cfar_detect()
into t_cfar_base class, and then defining the temporary
matrices/tensors as member variables.
- When taking a slice of a matrix/tensor, use a subview instead of
copying data.
For example, the 'pow_slice' matrix currently looks like:
Matrix<T> pow_slice = cpow(dom0, 0, dom2);
cfar_find_targets(... pow_slice ...);
for (...)
{
...
pow_slice = cpow(dom0, j, dom2);
cfar_find_targets(... pow_slice ...);
}
As written, pow_slice is a separate matrix holding a copy of
the slice from cpow. Each iteration through the loop, new data
is copied into pow_slice.
The reason that pow_slice is a copy instead of a reference
is because its block type (Dense<2, T>) is different from the block
type returned by 'cpow(...)' (an impl-defined block type that I don't
know off the top of my head, Subset_block maybe?). To have
pow_slice reference the data instead of copying it, it needs
to have the same block type returned from 'cpow()'.
You can use Tensor<T>::submtraix<2>::type to get the right type
(where Tensor<T> is the type of cpow):
typename Tensor<T>::submatrix<2>::type
pow0_slice = cpow(dom0, 0, dom2);
cfar_find_targets(... pow0_slice ...);
for (...)
{
...
typename Tensor<T>::submatrix<2>::type
pow_slice = cpow(dom0, j, dom2);
cfar_find_targets(... pow_slice ...);
}
Note that you can't change what pow_slice refers to after
you create it (i.e. change from '0' to 'j'). That's why this
has 'pow0_slice' and 'pow_slice'.
Of course, you could also do away with the explicit
variable 'pow_slice' altogether:
cfar_find_targets(... cpow(dom0, 0, dom2), ...);
for (...)
{
...
cfar_find_targets(... cpow(dom0, j, dom2), ...);
}
- When iterating through each element in a matrix or tensor,
try to arrange the variables to coincide with the dimension-order.
For example, if you have a 3 by 3 row-major matrix:
Matrix<T> mat(3, 3);
The data will be laid out like this in memory:
Address Matrix element
0 0,0
1 0,1
2 0,2
3 1,0
4 1,1
5 1,2
6 2,0
7 2,1
8 2,2
If you iterate over the elements like so:
for (index_type j=0; j<mat.size(1); ++j)
for (index_type i=0; i<mat.size(0); ++i)
.. use mat(i, j) ..
The sequence of addresses you will be accessing from memory will
look like:
0, 3, 6, 1, 4, 7, 2, 5, 8
This type of sequence makes poor utilization of the cache
because cache lines may be flushed out before all their
elements are used.
For example, the access to location 0 would pull in other
locations in the same cache line, such as location 1. However
if the matrix is large enough, one of the other access
(3 or 6 in this case) might flush location 1 out of the cache
before that element is accessed.
Instead, if you iterate over the elements like this:
for (index_type i=0; i<mat.size(0); ++i)
for (index_type j=0; j<mat.size(1); ++j)
.. use mat(i, j) ..
You will get the nice sequence:
0, 1, 2, 3, 4, 5, 6, 7, 8
>
> The benchmark only varies the number of range gates based upon the four
> sets of parameters defined in the HPEC paper. As the workload is
> equally dependent on each of the three dimensions, sweeping the other
> two would not add much value.
>
> Regards,
> + /***********************************************************************
> + Support
> + ***********************************************************************/
> +
> + template <typename T,
> + typename Block1,
> + typename Block2>
> + inline
> + void
> + cfar_find_targets(
> + const_Matrix<T, Block1> sum, // Sum of values in Cfar gates
> + length_type gates, // Total number of Cfar gates used
> + const_Matrix<T, Block2> pow_slice, // A set of squared values of range gates
> + const length_type mu, // Threshold for determining targets
> + Matrix<index_type> targets, // All of the targets detected so far.
> + index_type& next, // the next empty slot in targets
> + const length_type j) // Current range gate number.
> + {
> + if ( next >= targets.size(0) ) // full, nothing to do.
> + return;
> +
> + // Compute the local noise estimate. The inverse is calculated in advance
> + // for efficiency.
> + T inv_gates = (1.0 / gates);
> + Matrix<T> tl = sum * inv_gates;
> +
> + // Make sure we don't divide by zero! We take advantage of a
> + // reduction function here, knowing the values are positive.
> + Index<const_Matrix<T>::dim> idx;
> + if ( minval(tl, idx) == T() )
Checking that minval == T() is actually overhead. I.e. expanding it out:
// compute minval
for (i = ...)
for (k = ...)
if (t1(i, k) < minval) minval = ...
// set 0
if (val == 0.0)
for (i = ...)
for (k = ...)
if (t1(i, k) == 0.0) ...
In effect it is looping through the matrix multiple times. Just going
through the matrix and looking for zeros should be less expensive.
> + {
> + for ( index_type k = 0; k < tl.size(1); ++k )
> + for ( index_type i = 0; i < tl.size(0); ++i )
Since t1 is row-major, you should reverse the loop nest.
> + if ( tl(i,k) == 0.0 ) {
> + tl(i,k) = Precision_traits<T>::eps;
> + cout << "! " << i << " " << k << endl;
> + }
> + }
> +
> + // Compute the normalized power in the cell
> + Matrix<T> normalized_power = pow_slice / tl;
Instead of using a separate matrix for normalize_power, you could update
t1 in-place:
t1 = pow_slice / t1;
> +
> +
> + // If the normalized power is larger than mu record the coordinates. The
> + // list of target are held in a [N x 3] matrix, with each row containing
> + // the beam location, range gate and doppler bin location of each target.
> + //
> + for ( index_type k = 0; k < tl.size(1); ++k )
> + for ( index_type i = 0; i < tl.size(0); ++i )
> + {
> + if ( normalized_power(i,k) > mu )
> + {
> + targets(next,0) = i;
> + targets(next,1) = j;
> + targets(next,2) = k;
> + if ( ++next == targets.size(0) ) // full, nothing else to do.
> + return;
> + }
> + }
Looking at this entire function (cfar_find_targets), it could benefit
from loop fusion. It has 5 separate loops:
- compute t1
- (find minimum -- we can remove this)
- replace zero values with eps
- compute normalized power
- look for detections.
Fusing these loops together would process each element start-to-finish,
improving temporal locality.
Ignoring any vectorization potential, it would be more efficient to have
a single loop:
for (i = ...)
for (k = ...)
{
T t1 = sum(i, k) * inv_gates;
if (t1 == T()
t1 = eps;
T norm_power = pow_slice(i, k) / t1;
if (norm_power > mu)
... record detection ...
}
It would be nice if we could write a high-level VSIPL++ expression that
did the same thing. Something like this might work:
count = indexbool( pow_slice / max(sum * inv_gates, eps) > mu,
targets(Domain<1>(next, 1, targets.size() - next));
next += count;
It would be good to compare the performance of the explicit for loop
with the VSIPL++ approach to see if VSIPL++ does a good job.
> + }
> +
> +
> + template <typename T,
> + typename Block>
> + void
> + cfar_detect(
> + Tensor<T, Block> cube,
> + Matrix<index_type> found,
> + length_type cfar_gates,
> + length_type guard_cells,
> + length_type mu)
> + {
> + // Description:
> + // Main computational routine for the Cfar Kernel Benchmark. Determines
> + // targets by finding SNR signal data points that are greater than the
> + // noise threshold mu
> + //
> + // Inputs:
> + // cube: [beams x gates x bins] The radar datacube
> + //
> + // Note: this function assumes that second dimension of input cube C
> + // has length (range gates) greater than 2(cfar gates + guard cells).
> + // If this were not the case, then the parameters of the radar signal
> + // processing would be flawed!
Can you put this comment near the assertion that checks it?
> +
> + length_type beams = cube.size(0);
> + length_type gates = cube.size(1);
> + length_type dbins = cube.size(2);
> + test_assert( 2*(cfar_gates+guard_cells) < gates );
> +
> + Tensor<T> cpow = pow(cube, 2);
> +
> + Domain<1> dom0(beams);
> + Domain<1> dom2(dbins);
> + Matrix<T> sum(beams, dbins, T());
> + for ( length_type lnd = guard_cells; lnd < guard_cells+cfar_gates; ++lnd )
> + sum += cpow(dom0, 1+lnd, dom2);
> +
> + Matrix<T> pow_slice = cpow(dom0, 0, dom2);
> +
> + index_type next_found = 0;
> + cfar_find_targets(sum, cfar_gates, pow_slice, mu, found, next_found, 0);
> +
> + for ( index_type j = 1; j < gates; ++j )
> + {
> + length_type gates_used = 0;
> + length_type c = cfar_gates;
> + length_type g = guard_cells;
> +
You could move this 'if-then-else' statement outside of the loop. This
would result in multiple loops. Since the majority of time is spent in
case 3, keeping cases 1 & 2 and 4 & 5 together would be OK. I.e.:
- loop for cases 1 & 2
- loop for case 3
- loop for cases 4 & 5
> + // Case 1: No cell included on left side of CFAR;
> + // very close to left boundary
> + if ( j < (g + 1) )
> + {
> + gates_used = c;
> + sum += cpow(dom0, j+g+c, dom2) - cpow(dom0, j+g, dom2);
> + }
> + // Case 2: Some cells included on left side of CFAR;
> + // close to left boundary
> + else if ( (j >= (g + 1)) & (j < (g + c + 1)) )
> + {
> + gates_used = c + j - (g + 1);
> + sum += cpow(dom0, j+g+c, dom2) - cpow(dom0, j+g, dom2)
> + + cpow(dom0, j-(g+1), dom2);
> + }
> + // Case 3: All cells included on left and right side of CFAR
> + // somewhere in the middle of the range vector
> + else if ( (j >= (g + c + 1)) & ((j + (g + c)) < gates) )
> + {
> + gates_used = 2 * c;
> + sum += cpow(dom0, j+g+c, dom2) - cpow(dom0, j+g, dom2)
> + + cpow(dom0, j-(g+1), dom2) - cpow(dom0, j-(c+g+1), dom2);
> + }
> + // Case 4: Some cells included on right side of CFAR;
> + // close to right boundary
> + else if ( (j + (g + c) >= gates) & ((j + g) < gates) )
> + {
> + gates_used = c + gates - (j + g);
> + sum += - cpow(dom0, j+g, dom2)
> + + cpow(dom0, j-(g+1), dom2) - cpow(dom0, j-(c+g+1), dom2);
> + }
> + // Case 5: No cell included on right side of CFAR;
> + // very close to right boundary
> + else if (j + g >= gates)
> + {
> + gates_used = c;
> + sum += cpow(dom0, j-(g+1), dom2) - cpow(dom0, j-(c+g+1), dom2);
> + }
> + else
> + {
> + cerr << "Error: fell through if statements in Cfar detection - " <<
> + j << endl;
> + test_assert(0);
> + }
> +
> + pow_slice = cpow(dom0, j, dom2);
> + cfar_find_targets(sum, gates_used, pow_slice, mu, found, next_found, j);
> + }
> + }
> Index: benchmarks/loop.hpp
> ===================================================================
> RCS file: /home/cvs/Repository/vpp/benchmarks/loop.hpp,v
> retrieving revision 1.17
> diff -c -p -r1.17 loop.hpp
> *** benchmarks/loop.hpp 13 Apr 2006 19:21:07 -0000 1.17
> --- benchmarks/loop.hpp 2 May 2006 00:26:12 -0000
> *************** Loop1P::sweep(Functor fcn)
> *** 286,292 ****
>
> float factor = goal_sec_ / time;
> if (factor < 1.0) factor += 0.1 * (1.0 - factor);
> ! loop = (int)(factor * loop);
>
> if (factor >= 0.75 && factor <= 1.25)
> break;
> --- 286,299 ----
>
> float factor = goal_sec_ / time;
> if (factor < 1.0) factor += 0.1 * (1.0 - factor);
> ! if ( loop == (int)(factor * loop) )
> ! break; // Avoid getting stuck when factor ~= 1 and loop is small
> ! else
> ! loop = (int)(factor * loop);
> ! if ( loop == 0 )
> ! loop = 1;
> ! if ( loop == 1 ) // Quit if loop cannot get smaller
> ! break;
I was a little confused by this logic at first, but after considering
it, it seems OK.
I've thought about always starting the loop count at 1 for calibration
and only letting it grow. If the new loop is ever smaller than the old
one, that would end calibration (calibration would also end of 0.75 <=
factor <= 1.25 as currently). Do you think that would work?
>
> if (factor >= 0.75 && factor <= 1.25)
> break;
--
Jules Bergmann
CodeSourcery
jules at codesourcery.com
(650) 331-3385 x705
More information about the vsipl++
mailing list