

Postgres-XL 是一个以PostgreSQL为底层节点,提供透明的大规模并行计算,和写入性能线性扩展的分布式开源数据库项目。

Postgres-XL is an open source project to provide both write-scalability and massively parallel processing transparently to PostgreSQL. It is a collection of tightly coupled database components which can be installed on more than one system or virtual machine.

Write-scalable means Postgres-XL can be configured with as many database servers as you want and handle many more writes (updating SQL statements) than a single standalone database server could otherwise do. You can have more than one database server that provides a single database view. Any database update from any database server is immediately visible to any other transactions running on different servers. Transparent means you do not necessarily need to worry about how your data is stored in more than one database servers internally. 

Postgres-XL 开源项目的背后是一家叫做stormDB的公司,整个源码基于postgres-xc开源版本,应该是stormdb的一个分支。

In 2010, NTT's Open Source Software Center approached EnterpriseDB to
build off of NTT OSSC's experience with a project called RitaDB and
EnterpriseDB's experience with a project called GridSQL, and the
result was a new project, Postgres-XC.

In 2012, a company called StormDB was formed with some of the original
key Postgres-XC developers. StormDB added enhancements, including MPP
parallelism for performance and multi-tenant security.

In 2013, TransLattice acquired StormDB, and in 2014, open sourced it
as Postgres-XL.


postgresql xc修改了一些postgresql的代码,postgres-xl又把他们改了过来,然后又加了好多代码。注意区分#idef 和 #ifndef


Fully ACID
Open Source
Cluster-wide Consistency
Multi-tenant Security

OLAP with MPP Parallelism
Online Transaction Processing
Operational Data Store
Key-value including JSON

首先请仔细读官方overview,这篇review中概要地描述了整个系统的大概的状况。注意这个架构中dataNode和coordinators都可以部署多个,GTM(global Transcation Manager)只有一个,图中画了两个的原因是有一个是standby。


Q. How does Postgres-XL relate to Postgres-XC and Stado?
The project includes architects and developers who previously worked
on both Postgres­-XC and Stado, and Postgres-XL contains code from
Postgres-­XC. The Postgres-XL project has its own philosophy and
approach. Postgres-XL values stability, correctness and performance
over new functionality. The Postgres-XL project ultimately strives to
track and merge in code from PostgreSQL. Postgres-XL adds some
significant performance improvements like MPP parallelism and replan
avoidance on the data nodes that are not part of Postgres­-XC.
Postgres-­XC currently focuses on OLTP workloads. Postgres-XL is more
flexible in terms of the types of workloads it can handle including
Big Data processing thanks to its parallelism. Additionally,
Postgres-XL is more secure for multi­-tenant environments. The
Postgres-XL community is also very open and welcoming to those who
wish to become more involved and contribute, whether on the mailing
lists, participating in developer meetings, or meeting in person.
Users will help drive development priorities and the project roadmap.


XL和xc最大的不同在于:xc的逻辑是如果SQL可以下推到datanode上做,那么就下推,否则把所有数据读到coordinator上面统一做; 而XL则是真正意义上MPP。



#ifdef PGXC (PG-xc的改动)
#ifndef XCP(PG-xl基于xc的改动)


GTM stands for Global Transaction Manager. It provides global
transaction ID and snapshot to each transaction in Postgres-XL
database cluster. It also provide several global value such as
sequence and global timestamp.

GTM itself can be configured as a backup of other GTM as GTM-Standby
so that GTM can continue to run even if main GTM fails. You may want
to install GTM-Standby to separate server.




 * Get snapshot for the given transactions. If this is the first call in the
 * transaction, a fresh snapshot is taken and returned back. For a serializable
 * transaction, repeated calls to the function will return the same snapshot.
 * For a read-committed transaction, fresh snapshot is taken every time and
 * returned to the caller.
 * The returned snapshot includes xmin (lowest still-running xact ID),
 * xmax (highest completed xact ID + 1), and a list of running xact IDs
 * in the range xmin <= xid < xmax.  It is used as follows:
 *        All xact IDs < xmin are considered finished.
 *        All xact IDs >= xmax are considered still running.
 *        For an xact ID xmin <= xid < xmax, consult list to see whether
 *        it is considered running or not.
 * This ensures that the set of transactions seen as "running" by the
 * current xact will not change after it takes the snapshot.
 * All running top-level XIDs are included in the snapshot.
 * We also update the following global variables:
 *        RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
 *            running transactions
 * Note: this function should probably not be called with an argument that's
 * not statically allocated (see xip allocation below).
GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *status)


Because GTM has to take care of each transaction, it has to read and write enormous amount of messages which may restrict Postgres-XL scalability. GTM-Proxy is a proxy of GTM feature which groups requests and response to reduce network read/write by GTM. Distributing one snapshot to multiple transactions also contributes to reduce GTM network workload.


The Coordinator is an entry point to Postgres-XL from applications.
You can run more than one Coordinator simultaneously in the cluster.
Each Coordinator behaves just as a PostgreSQL database server, while
all the Coordinators handles transactions in harmonized way so that
any transaction coming into one Coordinator is protected against any
other transactions coming into others. Updates by a transaction is
visible immediately to others running in other Coordinators. To
simplify the load balance of Coordinators and Datanodes, as mentioned
below, it is highly advised to install same number of Coordinator and
Datanode in a server. 

 代码基本在 /src/backend/pgxc。


The Datanode is very close to PostgreSQL itself because it just handles incoming statements locally.

The Coordinator and Datanode shares the same binary but their behavior is a little different. The Coordinator decomposes incoming statements into those handled by Datanodes. If necessary, the Coordinator materializes response from Datanodes to calculate final response to applications. 



增加了一个叫做STORM_CATALOG_NAMESPACE的namespace,新增的系统表什么的都在这个namespace里。对于DDL语句来说,基本上就是发命令转发到所有的DataNode和Coodinator上面去,具体的代码,具体逻辑参见\src\backend\pgxc\pool\execRemote.c    函数

 * Execute utility statement on multiple Datanodes
 * It does approximately the same as
 * RemoteQueryState *state = ExecInitRemoteQuery(plan, estate, flags);
 * Assert(TupIsNull(ExecRemoteQuery(state));
 * ExecEndRemoteQuery(state)
 * But does not need an Estate instance and does not do some unnecessary work,
 * like allocating tuple slots.
ExecRemoteUtility(RemoteQuery *node)
 * Send specified statement down to the PGXC node
static int
pgxc_node_send_query_internal(PGXCNodeHandle * handle, const char *query,
        bool rollback)



EXECUTE DIRECT ON ( nodename [, ... ] )
Select some data in a given table tenk1 on remote Datanode named dn1:
EXECUTE DIRECT ON NODE dn1 'SELECT * FROM tenk1 WHERE col_char = ''foo''';
Select local timestamp of a remote node named coord2:
EXECUTE DIRECT ON coord2 'select clock_timestamp()';
Select list of tables of a remote node named dn50:
EXECUTE DIRECT ON dn50 'select tablename from pg_tables';

实际上在\src\backend\optimizer\plan\Planner.c      standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)中对directStmt是这么判断的: 

 * pgxc_direct_planner
 * The routine tries to see if the statement can be completely evaluated on the
 * datanodes. In such cases coordinator is not needed to evaluate the statement,
 * and just acts as a proxy. A statement can be completely shipped to the remote
 * node if every row of the result can be evaluated on a single datanode.
 * For example:
 * Only EXECUTE DIRECT statements are sent directly as of now
#ifdef XCP
	if (IS_PGXC_COORDINATOR && !IsConnFromCoord() && parse->utilityStmt &&
			IsA(parse->utilityStmt, RemoteQuery))
		return pgxc_direct_planner(parse, cursorOptions, boundParams);




在这个阶段,最重要的事情发生在【\src\backend\optimizer\prep 】   preprocess_targetlist(PlannerInfo *root, List *tlist)当中,中间用#ifdef XCP 包围的代码就是表示xl需要data node分布的代码,这里面会写两个关键的变量distribution->nodes(这个表涉及到的节点)和distribution->restrictNodes(在insert和update中可以根据主键过滤掉部分节点) 

对于不同的操作join,scan,group by都有着不同的分布方式。不过得到的plan是一致的。 


 * make_remotesubplan
 * 	Create a RemoteSubplan node to execute subplan on remote nodes.
 *  leftree - the subplan which we want to push down to remote node.
 *  resultDistribution - the distribution of the remote result. May be NULL -
 * results are coming to the invoking node
 *  execDistribution - determines how source data of the subplan are
 * distributed, where we should send the subplan and how combine results.
 *	pathkeys - the remote subplan is sorted according to these keys, executor
 * 		should perform merge sort of incoming tuples
RemoteSubplan *
make_remotesubplan(PlannerInfo *root,
				   Plan *lefttree,
				   Distribution *resultDistribution,
				   Distribution *execDistribution,
				   List *pathkeys)



 * create_remotesubplan_path
 *    Redistribute the data to match the distribution.
 * Creates a RemoteSubPath on top of the path, redistributing the data
 * according to the specified distribution.
Path *
create_remotesubplan_path(PlannerInfo *root, Path *subpath,
                          Distribution *distribution)

 * Set a RemoteSubPath on top of the specified node and set specified
 * distribution to it
static Path *
redistribute_path(Path *subpath, char distributionType,
				  Bitmapset *nodes, Bitmapset *restrictNodes,
				  Node* distributionExpr)



 * set_scanpath_distribution
 *	  Assign distribution to the path which is a base relation scan.
static void
set_scanpath_distribution(PlannerInfo *root, RelOptInfo *rel, Path *pathnode)




 * Analyze join parameters and set distribution of the join node.
 * If there are possible alternate distributions the respective pathes are
 * returned as a list so caller can cost all of them and choose cheapest to
 * continue.
static List *
set_joinpath_distribution(PlannerInfo *root, JoinPath *pathnode)
	 * If both subpaths are distributed by replication, the resulting
	 * distribution will be replicated on smallest common set of nodes.
	 * Catalog tables are the same on all nodes, so treat them as replicated
	 * on all nodes.
	 * Check if we have inner replicated
	 * The "both replicated" case is already checked, so if innerd
	 * is replicated, then outerd is not replicated and it is not NULL.
	 * This case is not acceptable for some join types. If outer relation is
	 * nullable data nodes will produce joined rows with NULLs for cases when
	 * matching row exists, but on other data node.
	 * Check if we have outer replicated
	 * The "both replicated" case is already checked, so if outerd
	 * is replicated, then innerd is not replicated and it is not NULL.
	 * This case is not acceptable for some join types. If inner relation is
	 * nullable data nodes will produce joined rows with NULLs for cases when
	 * matching row exists, but on other data node.
	 * This join is still allowed if inner and outer paths have
	 * equivalent distribution and joined along the distribution keys.
	 * Build cartesian product, if no hasheable restrictions is found.
	 * Perform coordinator join in such cases. If this join would be a part of
     * larger join, it will be handled as replicated.
	 * To do that leave join distribution NULL and place a RemoteSubPath node on
	 * top of each subpath to provide access to joined result sets.
	 * Do not redistribute pathes that already have NULL distribution, this is
	 * possible if performing outer join on a coordinator and a datanode
	 * relations.


和join 的做法类似


 * grouping_distribution_match
 * 	Check if the path distribution matches grouping distribution.
 * Grouping preserves distribution if the distribution key is on of the
 * grouping keys (arbitrary one). In that case it's guaranteed that groups
 * on different nodes do not overlap, and we can push the aggregation to
 * remote nodes as a whole.
 * Otherwise we need to either fetch all the data to the coordinator and
 * perform the aggregation there, or use two-phase aggregation, with the
 * first phase (partial aggregation) pushed down, and the second phase
 * (combining and finalizing the results) executed on the coordinator.
 * XXX This is used not only for plain aggregation, but also for various
 * other paths, relying on grouping infrastructure (DISTINCT ON, UNIQUE).
static bool
grouping_distribution_match(PlannerInfo *root, Query *parse, Path *path,
							List *clauses, List *targetList)
 * grouping_planner
 *      Perform planning steps related to grouping, aggregation, etc.
 * This function adds all required top-level processing to the scan/join
 * Path(s) produced by query_planner.
 * If inheritance_update is true, we're being called from inheritance_planner
 * and should not include a ModifyTable step in the resulting Path(s).
 * (inheritance_planner will create a single ModifyTable node covering all the
 * target tables.)
 * tuple_fraction is the fraction of tuples we expect will be retrieved.
 * tuple_fraction is interpreted as follows:
 *      0: expect all tuples to be retrieved (normal case)
 *      0 < tuple_fraction < 1: expect the given fraction of tuples available
 *        from the plan to be retrieved
 *      tuple_fraction >= 1: tuple_fraction is the absolute number of tuples
 *        expected to be retrieved (ie, a LIMIT specification)
 * Returns nothing; the useful output is in the Paths we attach to the
 * (UPPERREL_FINAL, NULL) upperrel in *root.  In addition,
 * root->processed_tlist contains the final processed targetlist.
 * Note that we have not done set_cheapest() on the final rel; it's convenient
 * to leave this to the caller.
static void
grouping_planner(PlannerInfo *root, bool inheritance_update,
                 double tuple_fraction)



#ifdef PGXC
		case T_RemoteQuery:
			result = (PlanState *) ExecInitRemoteQuery((RemoteQuery *) node,
													    estate, eflags);
#ifdef XCP
		case T_RemoteSubplan:
			result = (PlanState *) ExecInitRemoteSubplan((RemoteSubplan *) node,
													     estate, eflags);
#endif /* XCP */



#ifdef PGXC
		case T_RemoteQueryState:
			result = ExecRemoteQuery((RemoteQueryState *) node);
#ifdef XCP
		case T_RemoteSubplanState:
			result = ExecRemoteSubplan((RemoteSubplanState *) node);
#endif /* XCP */

 Execute step of PGXC plan

 * Execute step of PGXC plan.
 * The step specifies a command to be executed on specified nodes.
 * On first invocation connections to the data nodes are initialized and
 * command is executed. Further, as well as within subsequent invocations,
 * responses are received until step is completed or there is a tuple to emit.
 * If there is a tuple it is returned, otherwise returned NULL. The NULL result
 * from the function indicates completed step.
 * The function returns at most one tuple per invocation.
TupleTableSlot *
ExecRemoteQuery(PlanState *pstate)

调用 pgxc_node_receive  读取分布式节点数据

 * Wait while at least one of specified connections has data available and read
 * the data into the buffer
 * Returning state code
 *         DNStatus_OK      = 0,
 *        DNStatus_ERR     = 1,
 *        DNStatus_EXPIRED = 2,
 *        DNStatus_BUTTY

pgxc_node_receive(const int conn_count,
                  PGXCNodeHandle ** connections, struct timeval * timeout) 






typedef struct SQueueHeader *SharedQueue;


/* Shared queue header */
typedef struct SQueueHeader
    char        sq_key[SQUEUE_KEYSIZE]; /* Hash entry key should be at the
                                 * beginning of the hash entry */
    int            sq_pid;         /* Process id of the producer session */
    int            sq_nodeid;        /* Node id of the producer parent */
    SQueueSync *sq_sync;        /* Associated sinchronization objects */
    int            sq_refcnt;        /* Reference count to this entry */
    bool        stat_finish;
    long        stat_paused;
    int            sq_nconsumers;    /* Number of consumers */
    ConsState     sq_consumers[0];/* variable length array */
} SQueueHeader;


 * DistributionType - how to distribute the data
typedef enum DistributionType
	DISTTYPE_REPLICATION,			/* Replicated */
	DISTTYPE_HASH,				/* Hash partitioned */
	DISTTYPE_ROUNDROBIN,			/* Round Robin */
	DISTTYPE_MODULO				/* Modulo partitioned */
} DistributionType;

DISTRIBUTE BY Note: The following description applies only to

This clause specifies how the table is distributed or replicated among

REPLICATION Each row of the table will be replicated to all the
Datanode of the Postgres-XL database cluster.

ROUNDROBIN Each row of the table will be placed in one of the
Datanodes in a round-robin manner. The value of the row will not be
needed to determine what Datanode to go.

HASH ( column_name ) Each row of the table will be placed based on the
hash value of the specified column. Following type is allowed as
distribution column: INT8, INT2, OID, INT4, BOOL, INT2VECTOR,

Please note that floating point is not allowed as a basis of the
distribution column.

MODULO ( column_name ) Each row of the table will be placed based on
the modulo of the specified column. Following type is allowed as
distribution column: INT8, INT2, OID, INT4, BOOL, INT2VECTOR,

Please note that floating point is not allowed as a basis of the
distribution column.

If DISTRIBUTE BY is not specified, columns with UNIQUE constraint will
be chosen as the distribution key. If no such column is specified,
distribution column is the first eligible column in the definition. If
no such column is found, then the table will be distributed by



 * Returns true if 2PC is required for consistent commit: if there was write
 * activity on two or more nodes within current transaction.
IsTwoPhaseCommitRequired(bool localWrite)


 * Do pre-commit processing for remote nodes which includes Datanodes and
 * Coordinators. If more than one nodes are involved in the transaction write
 * activity, then we must run 2PC. For 2PC, we do the following steps:
 *  1. PREPARE the transaction locally if the local node is involved in the
 *     transaction. If local node is not involved, skip this step and go to the
 *     next step
 *  2. PREPARE the transaction on all the remote nodes. If any node fails to
 *     PREPARE, directly go to step 6
 *  3. Now that all the involved nodes are PREPAREd, we can commit the
 *     transaction. We first inform the GTM that the transaction is fully
 *     PREPARED and also supply the list of the nodes involved in the
 *     transaction
 *  4. COMMIT PREPARED the transaction on all the remotes nodes and then
 *     finally COMMIT PREPARED on the local node if its involved in the
 *     transaction and start a new transaction so that normal commit processing
 *     works unchanged. Go to step 5.
 *  5. Return and let the normal commit processing resume
 *  6. Abort by ereporting the error and let normal abort-processing take
 *     charge.




CREATE BARRIER -- create a new barrier
CREATE BARRIER barrier_name
Note: The following description applies only to Postgres-XL
CREATE BARRIER is new SQL command specific to Postgres-XL that creates a new XLOG record on each node of the cluster consistently. Essentially a barrier is a consistent point in the cluster that you can recover to. Note that these are currently created manually, not autoatically. Without barriers, if you recover an individual component, it may be possible that it is not consistent with the other nodes depending on when it was committed.
A barrier is created via a 2PC-like mechanism from a remote Coordinator in 3 phases with a prepare, execute and ending phases. A new recovery parameter called recovery_target_barrier has been added in recovery.conf. In order to perform a complete PITR recovery, it is necessary to set recovery_target_barrier to the value of a barrier already created. Then distribute recovery.conf to each data folder of each node, and then to restart the nodes one by one.
The default barrier name is dummy_barrier_id. It is used when no barrier name is specified when using CREATE BARRIER.




PAUSE CLUSTER -- pause the Postgres-XL cluster Synopsis


Note: The following description applies only to Postgres-XL

PAUSE CLUSTER is a SQL command specific to Postgres-XL that pauses
cluster operation.

Pause blocks any new transactions from starting and waits until
existing transactions complete, then returns. Existing sessions are
still connected to Coordinators, it is just that any new statements
will be held up and not be executed.

The session that paused the cluster can perform tasks exclusively on
the cluster. This is useful for maintenance tasks to resolve a
problem, restart a Datanode, manually failover a Datanode, etc.
Applications will not receive error messages unless they themselves
timeout, statement execution will just be briefly suspended.

Once the DBA has completed whatever tasks were needed, the command
UNPAUSE CLUSTER can be used.


PAUSE CLUSTER does not conform to the SQL standards, it is a
Postgres-XL specific command.



UNPAUSE CLUSTER -- unpause the Postgres-XL cluster Synopsis


Note: The following description applies only to Postgres-XL

UNPAUSE CLUSTER is a SQL command specific to Postgres-XL that unpauses
cluster operation.

If the DBA previously paused the cluster via the command PAUSE
CLUSTER, the DBA can resume operation vi UNPAUSE CLUSTER.


UNPAUSE CLUSTER does not conform to the SQL standards, it is a
Postgres-XL specific command.

