Parallel Foreign Scan of PostgreSQL

Enterprise PostgreSQL Solutions

Comments are off

Parallel Foreign Scan of PostgreSQL

PostgreSQL Community is working on adding built-in sharding in Postgres and it seems that the community is following the FDW based sharding approach for adding built-in sharding for PG. The Parallel Foreign Scan is a very important part of this approach as it will provide the functionality of executing multiple foreign scans in parallel and really improving the OLAP use case.

I want to touch on how parallel foreign scan may be implemented in Postgres in this blog. Firstly I will discuss about how parallel scan in Postgres works today. In the second part of the blog, I will introduce Andres’s linearized executor design for Postgres to gain the ability to jump among nodes. This is the infrastructure required in order to add parallel foreign scan in Postgres. In the end, I will share my thoughts on parallel foreign scan and on the current parallel scan mechanism.

1.Parallel Scan In PostgreSQL

PostgreSQL has implemented the Parallel Query from pg9.6, it includes Parallel Scan, Parallel Join, Parallel Aggregation. PostgreSQL has improved its support for parallelism over multiple major releases starting from PG 9.6.

1.1 What is Parallel Scan

Let’s talk about Parallel Query first, according to the documentation the description of Parallel Query is that ‘PostgreSQL can devise query plans which can leverage multiple CPUs in order to answer queries faster’.

The underlying idea is that the session asks for several worker processes to use the multiple CPUs to scan data of the same relation. It can speed up the scan but there be efficiency loss because of the parallel progress setup and tuple transferring between processes.

1.2 When Does Parallel Scan work?

Parallel Scan will work when the GUC is enabled and the executor thinks it worth doing a parallel scan as compared to doing a sequence scan, index scan(btree) or bitmap heap scan.

The GUC argument for Parallel scan is :

  • The max_parallel_workers_per_gather should bigger than 0.
max_parallel_workers_per_gather is an GUC argument to define that the max number of workers for a parallel work.
  • There should enough max_worker_processes remain for the scan.
max_worker_processes is the maximum number of background processes that the system can support
  • There should enough max_parallel_workers remain for the scan.
max_parallel_workers is the maximum number of workers that the system can support for parallel operations.
  • parallel_setup_cost and parallel_tuple_cost can be reduced so that the planner will prefer to do a parallel plan. The optimal values of the two arguments depend on the machine.
parallel_setup_cost is to set the planner's estimate of the cost of launching parallel worker processes. The default is 1000.
​
parallel_tuple_cost is to set the planner's estimate of the cost of transferring one tuple from a parallel worker process to another process. The default is 0.1.
  • The user can set force_parallel_mode ‘on’, to force parallelism with no consideration of efficiency.
The user can make force_parallel_mode on when you want to test it so that you can try the parallel without lots of data.

Only sequence scan, index scan, and bitmap heap scan support parallel.

  • Parallel Sequence Scan

We knew that relation data on the hard disk is organized by page, in a Parallel Sequence Scan the workers will pick up pages one by one for a scan in order to implement Parallel Sequence Scan.

Let’s do a test on the Parallel Sequence Scan, I am making changes in the source code in order to demonstrate how parallel sequence scan works.

I changed the source code so that it gives a log when a worker picks up a page, so there will be many log messages in the test results below and I just replace log message with 'Worker XXXXX picked page XXXXXX'. 

I have changed ‘force_parallel_mode’ to on, ‘parallel_setup_cost’ and ‘parallel_tuple_cost’ to 0 for test.

Same as the test in the Parallel Index Scan module.

Create test data and do the test here:

postgres=# create table t2(i int,j int,k varchar);
CREATE TABLE
postgres=# insert into t2 (i,j,k) select generate_series(1,20000000),10,'qazwsxedcr';
INSERT 0 2000000
postgres=# explain analyse select * from t2 where i = 100;
Worker XXXXX picked page XXXXXX
Worker 20672 picked page 127386
Worker 20714 picked page 127387
Worker 20715 picked page 127388
Worker 20672 picked page 127389
Worker 20714 picked page 127390
Worker 20672 picked page 127391
Worker 20715 picked page 127392
    QUERY PLAN                                  
---------------------------------------------------------------------------------------------------
Gather (cost=0.00..231556.04 rows=1 width=19) (actual time=1.862..3842.330 rows=1 loops=1)
  Workers Planned: 2
  Workers Launched: 2
  -> Parallel Seq Scan on t2 (cost=0.00..231556.04 rows=1 width=19) (actual time=2554.697..3833.
        Filter: (i = 100)
        Rows Removed by Filter: 6666666
Planning Time: 0.088 ms
Execution Time: 3842.367 ms
(8 rows)

postgres=#

It shows the progress during the test.

movead@home:~ $ ps -ef|grep postgres:
movead   20658 20656  0 11:06 ?        00:00:00 postgres: checkpointer  
movead   20659 20656  0 11:06 ?        00:00:00 postgres: background writer  
movead   20660 20656  0 11:06 ?        00:00:00 postgres: walwriter  
movead   20661 20656  0 11:06 ?        00:00:00 postgres: autovacuum launcher  
movead   20662 20656  0 11:06 ?        00:00:00 postgres: stats collector  
movead   20663 20656  0 11:06 ?        00:00:00 postgres: logical replication launcher  
movead   20672 20656  2 11:08 ?        00:00:21 postgres: movead postgres [local] EXPLAIN
movead   20714 20656 14 11:24 ?        00:00:00 postgres: parallel worker for PID 20672  
movead   20715 20656 14 11:24 ?        00:00:00 postgres: parallel worker for PID 20672  
movead   20717 18327  0 11:24 pts/3    00:00:00 grep --color=auto postgres:

On the test above, we can see that our client process is 20672 and two workers(20714、20715) are generated for the sequence scan, and these three progress pick up pages on by one.

  • Parallel Index Scan

Compare with Parallel Sequence Scan, the workers pick up the index page instead of the relation data pages. Every parallel worker scans index pages exclusive with other workers, currently it supports the btree index only.

Let’s do a test on the Parallel Index Scan in order to demonstrate how it works :

 

postgres=# create index on t2(i);
CREATE INDEX
postgres=# explain analyse select i,j from t2 where i < 100000;
Worker XXXXX Pick up index page XXX
Worker 22445 Pick up index page 267
Worker 22422 Pick up index page 268
Worker 22446 Pick up index page 269
Worker 22445 Pick up index page 270
Worker 22446 Pick up index page 271
Worker 22445 Pick up index page 272
Worker 22446 Pick up index page 273
Worker 22445 Pick up index page 274
Worker 22446 Pick up index page 275
Worker 22446 Pick up index page 276
                                                            QUERY PLAN                                                              
---------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.44..2909.45 rows=100115 width=8) (actual time=0.600..20.935 rows=99999 loops=1)
  Workers Planned: 2
  Workers Launched: 2
  -> Parallel Index Scan using t2_i_idx on t2 (cost=0.44..2909.45 rows=41715 width=8) (actual time=0.038..7.925 rows=33333 loops=3)
        Index Cond: (i < 100000)
Planning Time: 0.175 ms
Execution Time: 24.164 ms
(7 rows)
postgres=# select pg_backend_pid();
pg_backend_pid
----------------
         22422
(1 row)
postgres=#

It is executed so fast that I fail to show the progress, but we can see that our client process is 22422 and two workers(22445、22446) are generated for the index scan, and these three processes pick up index pages one by one.

  • Parallel Bitmap Heap Scan

I want to describe what is bitmap scan here but before that i should introduce the index scan first. ​Below is the picture of an index scan, the progress gets an index item from the index page and then scan the data page pointed by the index page. So it will scan page1 when get the index item1 and scan page1 another time when get the index item4, so that the progress should scan page2 twice and scan page4 twice to get the tuples.

It will generating redundant scans which cause decreased efficiency, we want to solve the problem by scanning all index page first and putting index items together which are pointed the at same target page. We can scan page 1 once and get tuple pointed by index item1 and index item2 (showed as figure below).

Back to Parallel Bitmap Scan now, the Bitmap scan has two parts as described above they are bitmap index scan and bitmap heap scan, only the bitmap heap scan can support parallel. The parallel work will pick up the data page to scan one by one. For example, a worker picks page1 for index item1 and index item2 while another worker picks page4 for index item2 and index item3.

Let‘s do a test on the Parallel Bitmap Scan:

postgres=# explain analyse select * from t2 where i< 100000 or i > 19999999;
Worker XXXXX pick up page XXX for XXX index items
Worker 31008 pick up page 630 for 157 index items
Worker 31009 pick up page 631 for 157 index items
Worker 31008 pick up page 632 for 157 index items
Worker 31009 pick up page 633 for 157 index items
Worker 31005 pick up page 634 for 157 index items
Worker 31008 pick up page 635 for 157 index items
Worker 31009 pick up page 636 for 147 index items
Worker 31008 pick up page 127388 for 1 index items
                                                              QUERY PLAN                                                                
-----------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=1905.80..195735.84 rows=100116 width=19) (actual time=7.429..19.487 rows=100000 loops=1)
  Workers Planned: 2
  Workers Launched: 2
  -> Parallel Bitmap Heap Scan on t2 (cost=1905.80..195735.84 rows=41715 width=19) (actual time=2.856..7.911 rows=33333 loops=3)
        Recheck Cond: ((i < 100000) OR (i > 19999999))
        Heap Blocks: exact=32
        -> BitmapOr (cost=1905.80..1905.80 rows=100116 width=0) (actual time=6.698..6.698 rows=0 loops=1)
              -> Bitmap Index Scan on t2_i_idx (cost=0.00..1851.30 rows=100115 width=0) (actual time=6.693..6.693 rows=99999 loops=1)
                    Index Cond: (i < 100000)
              -> Bitmap Index Scan on t2_i_idx (cost=0.00..4.45 rows=1 width=0) (actual time=0.003..0.003 rows=1 loops=1)
                    Index Cond: (i > 19999999)
Planning Time: 0.255 ms
Execution Time: 22.333 ms
(13 rows)

postgres=#

postgres=# select pg_backend_pid();
pg_backend_pid
----------------
         31005
(1 row)

postgres=#

It executed so fast that I fail to show the progress, but we can see that our client process is 31005 and two workers(31008、31009) are generated for the bitmap heap scan, and the three progress pick up data pages one by one.

1.3 The Mechanism Of The Parallel Scan

In this part, I want to talk about how the user backend creates workers, how workers scan data without any conflict and how the result of this work is gathered.

As shown in the figure.

Gather Node

When performing a scan, if it is judged that the plan of parallel scanning is better than that of single-process scanning, Gather Node will be added to the scan plan. All parallel programs begin with Gather Node, which has the following main functions:

  • Create the Dynamic Share Memory(dsm) for parallel query

The dsm is created in share memory and share data for all workers. So that the workers can pick up data or index pages for scan in order and the workers can put their scan results into dsm so that the GatherNode can gather tuples from dsm.

  • Start parallel work progress

GatherNode sends a message to Postmaster for requesting the start of the Work process. Work1 and Work2 were successfully launched (see figure).

  • Collect data returned by parallel processes, every worker scan for tuples and they put the scan results to their Message Queue(mq), just as the figure above that be tuples in mq. The GatherNode then will scan all mqs by loop, it keeps to return the tuple when it finds one in any mq until there is no worker be alive. The mq is created in dsm and am mq is mapped with a worker, it is where the result tuples store of every worker.

Worker scan for tuples

After the worker processes, Work1 and Work2 are forked out, the scanned node information will be obtained from the dsm, then they scan tuples from datafile and the intermediate results of execution will be stored in the dsm. There are three important intermediate results.

1. There be ‘phs_nallocated’ in ParallelBlockTableScanDesc struct
2. There be ‘btps_scanPage’ in BTParallelScanDesc struct
3. There be ‘spageptr’ in TBMSharedIteratorStatec struct

The three values are work for Parallel Sequence Scanning, Parallel Index Scanning and Parallel Bitmap Heap Scanning.

Parallel Sequence Scan schedules the worker in pages. When one worker locks a page, the other worker will not perform the scanning of the page. When each worker process acquires the next node to scan, it locks the phs_nallocates in DSM and add phs_nallocates by 1, and then releasing the lock.

Parallel Index Scan is scheduled in index pages. It uses btps_scanPage to complete the task of index page control.

The bitmap heap scan essentially performs sequence scanning on the heap, except that it scans not all heap pages. The bitmap index scan saves all pages that need to be scanned. When performing the bitmap heap scan, only pages are scanned from the saved list. Therefore, spageptr is used to complete the task of page control in the page list.

2. Executor Refactoring By Andres

As mentioned in Andres email, the parallel mechanism is limited by the current executor structure. The current executor runs the nodes by fix order so the process stays there when there is a blocker. An image that a query with muti node and current executing node is blocked and we want something can stop current node and turn to another one. For example, if a query needs scan data from two foreign tables, it will scan the two foreign tables one by one and the progress stays there when there be none tuple arrived caused by network or IO problem in the current executor. Andres wants a system to stop the first scan and turn to another one so that to raise efficiency.

In the mail, he imagines a linearized plan with a multiplexer node to decide which part of the execution tree to continue executing. The new executor can jump among nodes under control of a multiplexer node. He has a brief implementation about the linearized plan, here is the sample:

postgres=# select * from t2 where i < 10;                                    
 2019-06-21 09:56:53.853 CST [10685] LOG: pp:
 0: seqscan_first
 1: seqscan [j empty 4] > s0
 2: qual [j fail 1] < scan s0
 3: return < s0 [next 1]
 4: done
  i | j |     k      
 ---+---+------------
  1 | 1 | qazwsxedcr
  2 | 1 | qazwsxedcr
  3 | 1 | qazwsxedcr
  4 | 1 | qazwsxedcr
  5 | 1 | qazwsxedcr
  6 | 1 | qazwsxedcr
  7 | 1 | qazwsxedcr
  8 | 1 | qazwsxedcr
  9 | 1 | qazwsxedcr
(9 rows)
postgres=#
0: seqscan_first                   //begin scan
1: seqscan [j empty 4] > s0 //get a tuple and set to s0
//go to 4 if get nothing
2: qual [j fail 1] < scan s0 //Judge the tuple in s0,if not qual go to 1
3: return < s0 [next 1] //return the tuple in s0 and go to 1
4: done

This patch defines the new executor, it runs the nodes as a program jump among nodes. I think he not only wants this jump but also jump when blockers come. However, the refactoring executor is just an initial prototype and it needs more research and development.

Andres wants the refactoring executor to solve other problems described below:

  • Have a good ability on jit compile.
  • Improve code to bring some work form ExecInitNode to Planning.
  • Moving to a “push based” executor can yield noticeable speedups.
  • Create a shared state within the same prepared statement.
  • Deal with tuple result by pages instead of one by one.

3. Parallel Foreign Scan Hypothesis

3.1 What is Parallel Foreign Scan

The parallel here is for the parallelism of foreign scan nodes in a distributed query. A query SQL of a node is divided into multiple sub-SQLs and distributed to multiple shards. A Parallel Foreign scan allows multiple delegations and tuple collections to proceed simultaneously.

3.2 Main differences between Parallel Foreign scan and Parallel scan

Ordinary Parallel scan will scan the same data file at the same time by multiple processes while Parallel Foreign scan scans different data files on sharded nodes at the same time.

We can reuse the design of parallel worker processes and the design of dsm, and then change the worker’s work content to a one-to-one scan of external tables.

3.3 A way for Parallel Foreign Scan is described below

As shown in the figure.

ForeignGatherNode

It is the new node designed for the Parallel Foreign Scan. It does the same thing with Gathernode:

  • Create dynamic share memory

Just the same as what GatherNode does. It adds a shard node array which stores the shard nodes need scan and it uses ‘iterator’ instead of the intermediates to control the selection of shard node for each worker process. Normally when a worker finishes a scan on one shard node it selects the next one by ‘iterator’. What is more, it may have a difference module ‘blockercontrol’ to control the blocked worker focus on another shard node according to Andres’s new executor. Imagine that there be some IO problem on a scanning sharding, ‘blockercontrol’ can be aware and turn to the next shard node.

  • Start parallel work progresses

The ForeignGatherNode can start workers by sending message to postmaster too. But the type of worker is quite different. They run ForeignScan nodes and setup connection with shard nodes they choose. In other words, every work runs an FDW progress.

The tuples returned by FDW should be stored in mq instead of FDW space.

  • Collect data returned by parallel processesJust the same as what GatherNode does.

4. The Ending

In this blog, I have provided insight into how parallel query workes in Postgres today and using some examples demonstrated how it is works. Also discussed some ideas around parallal foreign scan and the work Andres is doing to build upto to this feature in the community.

However, this article does not have some detailed description of the implementation, such as the structure of ForeignGather, the storage structure of ForeignGather in dsm, how to realize the initialization and exec of ForeignGather, how to change the worker, etc.