[vsipl++] [patch] HPEC CFAR Detection benchmark

Jules Bergmann jules at codesourcery.com
Tue May 2 18:29:15 UTC 2006


Don McCoy wrote:
> The attached patch implements the CFAR benchmark.  Briefly, this problem 
> involves finding targets based on data within a three-dimensional cube 
> of 'beam locations', 'range gates' and 'doppler bins'.  It does this by 
> comparing the signal in a given cell to that of nearby cells in order to 
> avoid false-detection of targets.  The range gate parameter is varied 
> when considering 'nearby' cells.  A certain number of guard cells are 
> skipped, resulting in a computation that sums the values from two thick 
> slices of this data cube (one on either side of the slice for a 
> particular range gate).  The HPEC PCA Kernel-Level benchmark paper has a 
> diagram that shows one cell under consideration.  Please refer to it if 
> needed.
> 
> The algorithm involves these basic steps:
>   - compute the squares of all the values in the data cube
>   - for each range gate:
>     - sum the squares of desired values around the current range gate
>     - compute the normalized power for each cell in the slice
>     - search for values that exceed a certain threshold
> 
> Some of the code relates to boundary conditions (near either end of the 
> 'range gates' parameter), but otherwise it follows the above description.

Don,

Excellent description of the benchmark!  Can you put it into the file 
header as a comment?

> 
> For now, the original implementation used get/put (actually operator()) 
> instead of using subviews and the element-wise operators.  Switching 
> from one to the other resulted in about a 25% improvement in performance 
> for the first set of data (see attached graph).  The other sets 
> experienced improvement as well, to varying degrees.  I'd like to 
> consider how we can improve the throughput further.  Switching the 
> processing order may help possibly.  Thoughts are welcome.

General comments:
  - Avoid memory allocation/deallocation inside the compute loop.
    For example, the 't1' temporary matrix in cfar_find_targets()
    is being allocated/deallocated multiple times during a single
    test_cfar() call.

    You could avoid this by moving cfar_find_targets() and cfar_detect()
    into t_cfar_base class, and then defining the temporary
    matrices/tensors as member variables.

  - When taking a slice of a matrix/tensor, use a subview instead of
    copying data.

    For example, the 'pow_slice' matrix currently looks like:

	Matrix<T> pow_slice = cpow(dom0, 0, dom2);
  	cfar_find_targets(... pow_slice ...);
	for (...)
	{
	  ...
	  pow_slice = cpow(dom0, j, dom2);
	  cfar_find_targets(... pow_slice ...);
	}

    As written, pow_slice is a separate matrix holding a copy of
    the slice from cpow.  Each iteration through the loop, new data
    is copied into pow_slice.

    The reason that pow_slice is a copy instead of a reference
    is because its block type (Dense<2, T>) is different from the block
    type returned by 'cpow(...)' (an impl-defined block type that I don't
    know off the top of my head, Subset_block maybe?).  To have
    pow_slice reference the data instead of copying it, it needs
    to have the same block type returned from 'cpow()'.

    You can use Tensor<T>::submtraix<2>::type to get the right type
    (where Tensor<T> is the type of cpow):

	typename Tensor<T>::submatrix<2>::type
	  pow0_slice = cpow(dom0, 0, dom2);
	cfar_find_targets(... pow0_slice ...);
	for (...)
	{
	  ...
	  typename Tensor<T>::submatrix<2>::type
	    pow_slice = cpow(dom0, j, dom2);
	  cfar_find_targets(... pow_slice ...);
	}

    Note that you can't change what pow_slice refers to after
    you create it (i.e. change from '0' to 'j').  That's why this
    has 'pow0_slice' and 'pow_slice'.

    Of course, you could also do away with the explicit
    variable 'pow_slice' altogether:

	cfar_find_targets(... cpow(dom0, 0, dom2), ...);
	for (...)
	{
	  ...
	  cfar_find_targets(... cpow(dom0, j, dom2), ...);
	}

  - When iterating through each element in a matrix or tensor,
    try to arrange the variables to coincide with the dimension-order.

    For example, if you have a 3 by 3 row-major matrix:

	Matrix<T> mat(3, 3);

    The data will be laid out like this in memory:

	Address		Matrix element
	0		0,0
	1		0,1
	2		0,2
	3		1,0
	4		1,1
	5		1,2
	6		2,0
	7		2,1
	8		2,2

    If you iterate over the elements like so:

	for (index_type j=0; j<mat.size(1); ++j)
	  for (index_type i=0; i<mat.size(0); ++i)
	    .. use mat(i, j) ..

    The sequence of addresses you will be accessing from memory will
    look like:

	0, 3, 6, 1, 4, 7, 2, 5, 8

    This type of sequence makes poor utilization of the cache
    because cache lines may be flushed out before all their
    elements are used.

    For example, the access to location 0 would pull in other
    locations in the same cache line, such as location 1.  However
    if the matrix is large enough, one of the other access
    (3 or 6 in this case) might flush location 1 out of the cache
    before that element is accessed.

    Instead, if you iterate over the elements like this:

	for (index_type i=0; i<mat.size(0); ++i)
	  for (index_type j=0; j<mat.size(1); ++j)
	    .. use mat(i, j) ..

    You will get the nice sequence:

	0, 1, 2, 3, 4, 5, 6, 7, 8

> 
> The benchmark only varies the number of range gates based upon the four 
> sets of parameters defined in the HPEC paper.  As the workload is 
> equally dependent on each of the three dimensions, sweeping the other 
> two would not add much value.
> 
> Regards,

> + /***********************************************************************
> +   Support
> + ***********************************************************************/
> + 
> + template <typename T,
> +           typename Block1,
> +           typename Block2> 
> + inline
> + void
> + cfar_find_targets(
> +   const_Matrix<T, Block1> sum,       // Sum of values in Cfar gates
> +   length_type             gates,     // Total number of Cfar gates used
> +   const_Matrix<T, Block2> pow_slice, // A set of squared values of range gates
> +   const length_type       mu,        // Threshold for determining targets
> +   Matrix<index_type>      targets,   // All of the targets detected so far. 
> +   index_type&             next,      // the next empty slot in targets
> +   const length_type       j)         // Current range gate number. 
> + {
> +   if ( next >= targets.size(0) )  // full, nothing to do.
> +     return;
> + 
> +   // Compute the local noise estimate.  The inverse is calculated in advance
> +   // for efficiency.
> +   T inv_gates = (1.0 / gates);
> +   Matrix<T> tl = sum * inv_gates;
> + 
> +   // Make sure we don't divide by zero!  We take advantage of a
> +   // reduction function here, knowing the values are positive.
> +   Index<const_Matrix<T>::dim> idx;
> +   if ( minval(tl, idx) == T() )

Checking that minval == T() is actually overhead.  I.e. expanding it out:

	// compute minval
	for (i = ...)
	  for (k = ...)
	    if (t1(i, k) < minval) minval = ...

	// set 0
	if (val == 0.0)
	  for (i = ...)
	    for (k = ...)
	      if (t1(i, k) == 0.0) ...

In effect it is looping through the matrix multiple times.  Just going 
through the matrix and looking for zeros should be less expensive.


> +   {
> +     for ( index_type k = 0; k < tl.size(1); ++k )
> +       for ( index_type i = 0; i < tl.size(0); ++i )

Since t1 is row-major, you should reverse the loop nest.

> +         if ( tl(i,k) == 0.0 ) {
> +           tl(i,k) = Precision_traits<T>::eps;
> +           cout << "! " << i << " " << k << endl;
> +         }
> +   }
> + 
> +   // Compute the normalized power in the cell
> +   Matrix<T> normalized_power = pow_slice / tl;

Instead of using a separate matrix for normalize_power, you could update 
t1 in-place:

	t1 = pow_slice / t1;

> + 
> + 
> +   // If the normalized power is larger than mu record the coordinates.  The
> +   // list of target are held in a [N x 3] matrix, with each row containing 
> +   // the beam location, range gate and doppler bin location of each target. 
> +   //
> +   for ( index_type k = 0; k < tl.size(1); ++k )
> +     for ( index_type i = 0; i < tl.size(0); ++i )
> +     {
> +       if ( normalized_power(i,k) > mu )
> +       {
> +         targets(next,0) = i;
> +         targets(next,1) = j;
> +         targets(next,2) = k;
> +         if ( ++next == targets.size(0) )  // full, nothing else to do.
> +           return;
> +       }
> +     }

Looking at this entire function (cfar_find_targets), it could benefit 
from loop fusion.  It has 5 separate loops:

  - compute t1
  - (find minimum -- we can remove this)
  - replace zero values with eps
  - compute normalized power
  - look for detections.

Fusing these loops together would process each element start-to-finish, 
improving temporal locality.

Ignoring any vectorization potential, it would be more efficient to have 
a single loop:

	for (i = ...)
	  for (k = ...)
	  {
	    T t1 = sum(i, k) * inv_gates;
	    if (t1 == T()
	      t1 = eps;
	    T norm_power = pow_slice(i, k) / t1;
	    if (norm_power > mu)
	       ... record detection ...
	  }

It would be nice if we could write a high-level VSIPL++ expression that 
did the same thing.  Something like this might work:

	count = indexbool( pow_slice / max(sum * inv_gates, eps) > mu,
		  targets(Domain<1>(next, 1, targets.size() - next));
	next += count;

It would be good to compare the performance of the explicit for loop 
with the VSIPL++ approach to see if VSIPL++ does a good job.

> + }
> + 
> + 
> + template <typename T,
> +           typename Block>
> + void
> + cfar_detect(
> +   Tensor<T, Block>   cube,
> +   Matrix<index_type> found,
> +   length_type        cfar_gates,
> +   length_type        guard_cells,
> +   length_type        mu)
> + {
> + // Description:
> + //   Main computational routine for the Cfar Kernel Benchmark. Determines 
> + //   targets by finding SNR signal data points that are greater than the 
> + //   noise threshold mu
> + //
> + // Inputs:
> + //    cube: [beams x gates x bins] The radar datacube
> + //
> + // Note: this function assumes that second dimension of input cube C  
> + // has length (range gates) greater than 2(cfar gates + guard cells).
> + // If this were not the case, then the parameters of the radar signal 
> + // processing would be flawed!

Can you put this comment near the assertion that checks it?


> + 
> +   length_type beams = cube.size(0);
> +   length_type gates = cube.size(1);
> +   length_type dbins = cube.size(2);
> +   test_assert( 2*(cfar_gates+guard_cells) < gates );
> + 
> +   Tensor<T> cpow = pow(cube, 2);
> + 
> +   Domain<1> dom0(beams);
> +   Domain<1> dom2(dbins);
> +   Matrix<T> sum(beams, dbins, T());
> +   for ( length_type lnd = guard_cells; lnd < guard_cells+cfar_gates; ++lnd )
> +     sum += cpow(dom0, 1+lnd, dom2);
> + 
> +   Matrix<T> pow_slice = cpow(dom0, 0, dom2);
> + 
> +   index_type next_found = 0;
> +   cfar_find_targets(sum, cfar_gates, pow_slice, mu, found, next_found, 0);
> + 
> +   for ( index_type j = 1; j < gates; ++j )
> +   {
> +     length_type gates_used = 0;
> +     length_type c = cfar_gates;
> +     length_type g = guard_cells;
> + 

You could move this 'if-then-else' statement outside of the loop.  This 
would result in multiple loops.  Since the majority of time is spent in 
case 3, keeping cases 1 & 2 and 4 & 5 together would be OK.  I.e.:
  - loop for cases 1 & 2
  - loop for case 3
  - loop for cases 4 & 5

> +     // Case 1: No cell included on left side of CFAR; 
> +     // very close to left boundary 
> +     if ( j < (g + 1) ) 
> +     {
> +       gates_used = c;
> +       sum += cpow(dom0, j+g+c, dom2)   - cpow(dom0, j+g, dom2);
> +     }
> +     // Case 2: Some cells included on left side of CFAR;
> +     // close to left boundary 
> +     else if ( (j >= (g + 1)) & (j < (g + c + 1)) )
> +     {
> +       gates_used = c + j - (g + 1);
> +       sum += cpow(dom0, j+g+c, dom2)   - cpow(dom0, j+g, dom2) 
> +            + cpow(dom0, j-(g+1), dom2);
> +     }
> +     // Case 3: All cells included on left and right side of CFAR
> +     // somewhere in the middle of the range vector
> +     else if ( (j >= (g + c + 1)) & ((j + (g + c)) < gates) )
> +     {
> +       gates_used = 2 * c;
> +       sum += cpow(dom0, j+g+c, dom2)   - cpow(dom0, j+g, dom2) 
> +            + cpow(dom0, j-(g+1), dom2) - cpow(dom0, j-(c+g+1), dom2);
> +     }
> +     // Case 4: Some cells included on right side of CFAR;
> +     // close to right boundary
> +     else if ( (j + (g + c) >= gates) & ((j + g) < gates) )
> +     {
> +       gates_used = c + gates - (j + g);
> +       sum +=                           - cpow(dom0, j+g, dom2) 
> +            + cpow(dom0, j-(g+1), dom2) - cpow(dom0, j-(c+g+1), dom2);
> +     }
> +     // Case 5: No cell included on right side of CFAR; 
> +     // very close to right boundary 
> +     else if (j + g >= gates)
> +     {
> +       gates_used = c;
> +       sum += cpow(dom0, j-(g+1), dom2) - cpow(dom0, j-(c+g+1), dom2);
> +     }    
> +     else
> +     {
> +       cerr << "Error: fell through if statements in Cfar detection - " << 
> +         j << endl;
> +       test_assert(0);
> +     }
> + 
> +     pow_slice = cpow(dom0, j, dom2);
> +     cfar_find_targets(sum, gates_used, pow_slice, mu, found, next_found, j);
> +   }
> + }

> Index: benchmarks/loop.hpp
> ===================================================================
> RCS file: /home/cvs/Repository/vpp/benchmarks/loop.hpp,v
> retrieving revision 1.17
> diff -c -p -r1.17 loop.hpp
> *** benchmarks/loop.hpp	13 Apr 2006 19:21:07 -0000	1.17
> --- benchmarks/loop.hpp	2 May 2006 00:26:12 -0000
> *************** Loop1P::sweep(Functor fcn)
> *** 286,292 ****
>   
>       float factor = goal_sec_ / time;
>       if (factor < 1.0) factor += 0.1 * (1.0 - factor);
> !     loop = (int)(factor * loop);
>   
>       if (factor >= 0.75 && factor <= 1.25)
>         break;
> --- 286,299 ----
>   
>       float factor = goal_sec_ / time;
>       if (factor < 1.0) factor += 0.1 * (1.0 - factor);
> !     if ( loop == (int)(factor * loop) )
> !       break;          // Avoid getting stuck when factor ~= 1 and loop is small
> !     else
> !       loop = (int)(factor * loop);
> !     if ( loop == 0 ) 
> !       loop = 1; 
> !     if ( loop == 1 )  // Quit if loop cannot get smaller
> !       break;

I was a little confused by this logic at first, but after considering 
it, it seems OK.

I've thought about always starting the loop count at 1 for calibration 
and only letting it grow.  If the new loop is ever smaller than the old 
one, that would end calibration (calibration would also end of 0.75 <= 
factor <= 1.25 as currently).  Do you think that would work?


>   
>       if (factor >= 0.75 && factor <= 1.25)
>         break;




-- 
Jules Bergmann
CodeSourcery
jules at codesourcery.com
(650) 331-3385 x705



More information about the vsipl++ mailing list