Welcome to MSDN Blogs Sign in | Join | Help

Distinct Aggregation Considered Harmful

Distinct aggregation (e.g. select count(distinct key) …) is a SQL language feature that results in some very slow queries.  It’s particularly frustrating that you can take a perfectly efficient query with multiple aggregates, and make that query take forever just by adding a distinct keyword to one of the aggregates.  For instance, it often makes logical sense to compute distinct counts “along the way” while you are computing other aggregates.  Recently, I've seen a number of customer queries like this:

select

sum(salary),

max(salary),

count(employeeid),

count(distinct employeeid),

count(distinct salarygrade)

 

In practice, however, SQL Server 2008 does not compute these distinct aggregates “along the way.”  Mixing one or more distinct aggregates with non-distinct aggregates in the same select list, or mixing two or more distinct aggregates, leads to spooling and rereading of intermediate results – which can be more expensive than computing the distinct aggregates in a separate query!  In the worst case, such as the fragment of code above, this is sort of inevitable.  The SQL Server 2008 query processor cannot compute aggregates on a stream without destroying the stream.  So computing the distinct aggregate consumes the stream, which has to be recomputed for the other aggregates.

Fortunately, for select lists with only one distinct aggregate, you can rewrite the input SQL in a way that does not have this problem.  SQL Server 2008’s optimizer does not do this rewrite for you.  (Disclaimers:  all the aggregates have to be algebraic [Gray et al 1996], and no guarantees are made that the behavior of the rewrite is exactly the same, particularly when there are arithmetic overflows.)

Getting Rid of Distinct Aggregates

The code below shows an example rewrite using the AdventureWorksDW database distributed with SQL Server 2008.  The rewrite gives me 5x speedup on my desktop even on this small database!  What’s going on is that we are breaking the aggregation into two aggregation steps.  First, we build an intermediate result called PartialSums.  We group together all the values for the distinct aggregate (adding the distinct aggregate’s column to the GROUP BY list), while building partial aggregations for all the non-distinct aggregates (in this example, just count(*)).  Then in the second step, we use the original GROUP BY list, which in this example is empty, and collect the partial aggregates together.  Note how the aggregate functions used depend on the initial aggregates:  count becomes count, followed by sum.  Note that in sum(1), the 1 is actually a constant number value, not a column reference.

As far as I can tell, this rewrite is never worse than the spooled plan.  It parallelizes better, uses tempdb better, and does less logical I/O.

set statistics profile on

set statistics time on

go

use adventureworksdw

go

--

-- Use a distinct aggregate and a normal aggregate in the same select list

-- over some complex (one or more joins) derived table

--

with FISinFRS as (

 select * from factinternetsales FIS

  where ProductKey in

   (select ProductKey from FactResellerSales)

 

)

select

      count(*)                            as CountStar,

      count(distinct ProductKey)          as CountProductKeys

from FISinFRS

go

 

--

-- Now use partial aggregations in a new derived table

-- This is equivalent, but SQL Server 2008 does not do this transformation

-- for you

--

with FISinFRS as (

 select * from factinternetsales FIS

  where ProductKey in

   (select ProductKey from FactResellerSales)

 

)

, PartialSums as (

  select

      count(*)                            as CountStarPartialCount

  from FISinFRS

  group by ProductKey

)

select

      sum(CountStarPartialCount)          as CountStar,

      sum(1)                              as CountProductKeys

from PartialSums

 

References

[Gray et al 1996] Data cube: A relational aggregation operator generalizing group-by, cross-tab, and sub-totals (1996), ftp://ftp.research.microsoft.com/pub/tr/tr-95-22.pdf.

 

 

By Marc Friedman, 9/2008

Marc.friedman@donotreply.microsoft.com

 

 

Store Statistics XML in database tables using SQL Traces for further analysis.

Since SQL Server 2005, query plan as well as statistics of query execution can be captured  in XML format. Also, SQL Server 2005 has XQuery support to directly query XML document. By combining these two new features, users can analyze the query plans using queries.

 

However, in SQL Server, there is no easy way to capture the statitics XML into a table. Fortunately, there are SQL traces provided by SQL Server to capture the showplan XML and statistics XML information into trace files and loaded into tables.

 

Note: The only limitation is the 128 level of nesting levels supported by XML data type in SQL 2005. In that case, you have to write client code to parse the query plan, which going to be a very complex query plan.

 

Here is a small example using SQL traces to store the statistics XML and extract the estimated rows and actual rows

 

/*Using Traces to Capture Statistics XML*/

declare @trace_id int

declare @trace_file nvarchar(200)

select @trace_file = 'c:\temp\test_stats_' + cast(newid() as varchar(100))

 

-- using trace table.

exec sp_trace_create @trace_id output,

      2,

      @tracefile=@trace_file

 

-- capture statistics-xml, textdata, on

exec sp_trace_setevent @trace_id, 146, 1, 1

 

-- start

exec sp_trace_setstatus @trace_id, 1

 

-- test statement.

select * from sys.objects

 

-- stop

exec sp_trace_setstatus @trace_id, 0

 

-- close

exec sp_trace_setstatus @trace_id, 2

 

-- load trace files into table

if object_id('temp_trc') is not null

      drop table temp_trc

 

select *

into temp_trc

from fn_trace_gettable(@trace_file + '.trc', default)

 

-- look at the captured stats xml

declare @plan xml

 

select @plan=cast(textdata as xml)

from temp_trc

where eventclass = 146;

 

-- collect the actual and also estimate stats.

with XMLNAMESPACES ('http://schemas.microsoft.com/sqlserver/2004/07/showplan' as sql)

select

      ro.relop.value('@NodeId', 'int') NodeId,

      ro.relop.value('@PhysicalOp', 'nvarchar(200)') PhysicalOp,

      ro.relop.value('@LogicalOp', 'nvarchar(200)') LogicalOp,

      (ro.relop.value('@EstimateRows', 'float')

            * (ro.relop.value('@EstimateRewinds', 'float')

                  +  ro.relop.value('@EstimateRebinds', 'float')

                  + 1.0)) EstimateRows,

      case

            when root_actual.ActualRows = 0

                  then null

            else root_actual.ActualRows

      end ActualRows,

      cast(ro.relop.exist('*/sql:RelOp') as bit) IsNotLeaf

from @plan.nodes('//sql:RelOp') as ro(relop)

      cross apply (

            select sum(rti.info.value('@ActualRows', 'float')) ActualRows

            from ro.relop.nodes('sql:RunTimeInformation/sql:RunTimeCountersPerThread') as rti(info)

      ) root_actual;

go

 

The output of the estimate rows and actual rows is given below:

 

NodeId

PhysicalOp

LogicalOp

EstimateRows

ActualRows

IsNotLeaf

0

Nested Loops

Left Outer Join

52

52

1

1

Nested Loops

Left Outer Join

52

52

1

2

Filter

Filter

52

52

1

3

Compute Scalar

Compute Scalar

52

NULL

1

4

Clustered Index Scan

Clustered Index Scan

52

52

0

13

Clustered Index Seek

Clustered Index Seek

52.000031

NULL

0

14

Clustered Index Seek

Clustered Index Seek

52

52

0

Index Build strategy in SQL Server - Part 4-2: Offline Serial/Parallel Partitioning (Non-aligned partitioned index build)

Source Partitioned
While the table is partitioned, we may want to change the way it is partitioned when building the new index. For example, by using the same partition function and scheme, the new index can be partitioned on different columns than the original table.

Create table t (c1 int, c2 int) on ps(c2)

…….

Create clustered Index idx_t on t(c1) on ps(c1)

The serial plan looks like follows.
    Index Insert
       |
     Sort
       |
      NL (Nested Loop)
     /    \
 CTS   Scan
CTS is the Constant Table Scan. It scans each partition one by one and provides partition ID to the inner side (the lower part in graphics showplan) of NL. The inner side of NL scans the corresponding partition and sends data to the sort iterator. From there, it is exactly the same as source non-partitioned scenario. Not surprisingly, the memory and disk space requirement is the same too.

In the case of parallel plan, we have
      X (Distribute Streams)
       |
     Index Insert
       |
     Sort
       |
       X (Repartition Streams)
       |
      NL
     /   \
   X    Scan
  /
CTS
The operator above CTS is a Gather Streams operator, meaning it has one producer and many consumers. There is no parallelism below this operator. Between the Gather Streams and the Repartition Streams, each worker is assigned (Number of Source's Partitions)/(Degree of Parallelism) number of source partitions. The source is only scanned once in total.

The Repartition Streams operator splits the query plan into two parallelism sections. Between the top-level Distribute Streams and the Repartition Streams, we have a different set of workers than the worker set below Repartition Streams. Each worker is assigned (Number of Target Index Partitions)/(Degree of Parallelism) target partitions. The target index partition information is pushed down to the Repartition Streams which redirects data to different sort tables based on target partition location. The rest, i.e. how the sort and index building works, is the same as the parallel plan in the source non-partitioned case. Again, the memory and disk space requirement is the same as in the source non-partitioned case.

Posted by queryproc | 1 Comments
Filed under:

Index Build strategy in SQL Server - Part 4-1: Offline Serial/Parallel Partitioning (Non-aligned partitioned index build)

Recall that in the previous posts on index build, we defined "aligned" as the case when base object and in-build index use the same partition schema, and "non-aligned" to be the case when heap and index use different partition schemes, or the case when heap is not partitioned. In this post, we will talk about the two scenarios of non-aligned partitioned index build, source partitioned and source not partitioned.

Source Not Partitioned
Consider the following query.

Create Partition Function pf (int)

as range right for values (1, 100, 1000)

Create Partition Scheme ps as Partition pf

ALL TO ([PRIMARY])

Create table t (c1 int, c2 int) –the table is created on the primary filegroup by default

Create clustered Index idx_t on t(c1) on ps(c1) -- non-aligned index build

The serial plan is straightforward.

 Index Insert (write data to the in-build index)
   |
 Sort (order by index key)
   |
 Scan (read data from source)

The sort iterator creates one sort table per target partition (there are four partitions in this example so we will construct four sort tables concurrently). By default, we use the user database for sort to spill data. As we mentioned before, we free each extent from sort table after all its rows are copied. By doing this, for each partition, we can reduce the disk space requirement from 3*partition size (source + sort table + b-tree) to just about 2.2*partition size. Therefore, each file group requires 2.2*(size of all partitions that belong to this file group) of disk space. If the users specify sort_in_tempdb, then all the sort tables reside in the tempdb. Therefore, we require 2.2*(Size of the whole index) of free space in tempdb.

Index insert iterator can start building index after the sort iterator finishes sorting all sort tables. Therefore, we will have as many sort tables as the number of partitions at the same time. Recall that each sort table requires at least 40 pages. So, the minimum required memory will be #PT*40pages.

When it comes to parallel plan, it looks like

 X (Exchange)
   |
 Index Insert
   |
 Sort
   |
 Scan

Each worker thread is assigned (Partition Count)/(Degree Of Parallelism) number of partitions (e.g. if we have 4 partitions and 4 worker threads each gets 1 partition), which can be skewed. The sort iterator creates one sort table per assigned partition. Each worker scans the source once and retrieves the rows that belong to its partition(s), the retrieved rows will be inserted into the corresponding sort table depending on which partition they belong to.

After all sort tables got populated, the index builder starts consuming rows from sort tables, it consumes one sort table after another, building b-tree(s) in each partition's file group. Currently workers do not share partitions. Therefore, it is possible for one worker to finish all assigned partitions and idle while another worker is busy inserting rows.

The disk space and memory requirements are exactly the same as the serial plan. This is because in both cases, we cannot start building the index until all the sort tables are populated.

Posted by queryproc | 1 Comments
Filed under:

How to Check Whether the Final Query Plan is Optimized for Star Join Queries?

The star join optimization technique is an index based optimization designed for data warehousing scenarios to make optimal use of non-clustered indexes on the huge fact tables. The general idea is to use the non-clustered indexes on the fact table to limit the number of rows scanned from it. More details of index based star join optimization can be found at http://blogs.msdn.com/bi_systems/pages/164502.aspx.

 

The following discussion is based on SQL Server 2005 query plans. 

 

In SQL Server 2005, we put "StarJoinInfo" element in Showplan XML to highlight star join optimization. If the query plan contains the “StarJoinInfo” element, then SQL Server has identified this plan as a star join plan and it definitely is one.

 

However, the query optimizer may not detect all star join plans due to star join detection restrictions. Hence there are some star join plans that won’t have the StarJoinInfo. This post will shed some light on how to manually detect if a given query plan is a star join plan. 

 

These steps can help you identify what’s NOT a star join plan:

  • First identify your fact table.
  • If you see clustered index scan (or table scan) on fact table, then it’s not an index-based star join plan (however, this is a valid multi-table join plan, which can benefit from multiple-bitmap filter pushdown).

To identify a star join plan, you should:

  • Again, first identify your fact table
  • You should have a single RID lookup or clustered index seek on the fact table
  • Restrictive dimensions (dimension tables with restrictive filters in the query) should be processed before processing the clustered index seek or RID lookup on the fact table. You should find either:
    • A Cartesian product between the dimensions joined with a multi-column index on the fact table.
    • Or a semi-join of the dimensions with some non-clustered indexes on the fact table.
    • Or a join between multiple dimensions.
  • Non-restrictive dimensions are joined later with the fact table.

So the rule of thumb is: to detect whether a given plan is using index-based star join optimization, you should always look for a seek on a fact table that is based on some joins of some dimension tables.

Hash Warning SQL Profiler Event

One of the less well-known warning events that is logged to SQL Profiler trace is the Hash Warning event.  Hash Warning events are fired when a hash recursion or hash bailout has occurred during a hashing operation.  Both of these situations are less than desirable, as they mean that a Hash Join or Hash Aggregate has run out of memory and been forced to spill information to disk during query execution.  When a hashing operation spills to disk, this almost always results in slower query performance and can cause space consumption increase in tempdb.

 

Note that the Hash Warning event needs to be explicitly enabled within SQL Profiler.  It is not one of the “default” set of events.  More info on SQL Profiler can be found here

 

What can be done if you see a lot of Hash Warning events?  The recommended actions are:

 

·         Make sure that statistics exist on the columns that are involved in the hashing operation.  Without statistics, the hashing operation has no way to know how much memory to pre-allocate.

·         Even if statistics do exist, try updating them, as this can give more current information to the hashing operation when it decides how much memory to allocate.

·         Try using a different type of join (this can be done by hinting OPTION(MERGE JOIN) or OPTION(LOOP JOIN)).  Note that forcing a join type does not necessarily guarantee a better execution plan.

·         If all of these fail, you can increase the available memory on the computer.

 

A sample of what you will see in the profiler would look something like the following.  Note the batch starting, followed by a number of Hash Warning events prior to batch completion.  Also, the SPID that is causing the events will be recorded

 

EventClass

StartTime

SPID

SQL:BatchStarting                           

2007-02-01 18:53:34.703

51

Hash Warning

2007-02-01 18:53:48.267

51

Hash Warning

2007-02-01 18:53:48.283

51

Hash Warning

2007-02-01 18:53:50.097

51