Presto源码解析—从SQL到AST抽象语法树

编程入门 行业动态 更新时间:2024-10-25 13:16:04

Presto源码解析—从SQL到AST<a href=https://www.elefans.com/category/jswz/34/1765602.html style=抽象语法树"/>

Presto源码解析—从SQL到AST抽象语法树

前面一篇Presto源码解析之提交查询讲了从客户端以Cli方式和JDBC方式提交SQL到Coordinator的原理,这一篇我们来看一下SQL提交到Coordinator是如何进行预处理,是如何从SQL变成AST抽象语法树的。

源码时序图如下:

接下来我们来详细看一下整个流程中比较重要的类和方法(部分细节会暂时忽略):
QueuedStatementResource:负责处理客户端的Restful请求,包括接收查询,查询执行状态等,关键接口有:

URL请求方式作用
/v1/statementPOST提交查询
queued/{queryId}/{slug}/{token}Get查询执行状态

第一次请求URL为/v1/statement,对应QueuedStatementResource.postStatement方法,在postStatement里面会处理请求的HTTP参数,构造SessionContext(用于后续构造Session对象),并返回nextUri(客户端下一次请求的URL)。比较重要的一个是在构造SessionContext的时候会对请求头中X-Presto-Prepared-Statement的参数进行一次Parser,也就是语法解析,和本文中后面要讲的Parser一样,会进入这块逻辑的是Execute类型的SQL。

preparedStatements = parsePreparedStatementsHeaders(headers);

第一次提交/v1/statement请求时不会立刻提交查询,当通过第一次返回的nextUri进行第二次请求是会触发提交查询,具体的逻辑在 Query.waitForDispatched方法里面调用DispatchManager.createQuery进行创建。

private ListenableFuture<?> waitForDispatched(){// if query query submission has not finished, wait for it to finishsynchronized (this) {if (querySubmissionFuture == null) {querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, query);}if (!querySubmissionFuture.isDone()) {return querySubmissionFuture;}}// otherwise, wait for the query to finishreturn dispatchManager.waitForDispatched(queryId);}

当SQL执行完成后结果的获取是通过ExecutingStatementResource的/v1/statement/executing/{queryId}/{slug}/{token}接口进行获取的,ExecutingStatementResource只有在真正提交查询之后才会和客户端进行交互。

// ExecutingStatementResource@GET@Path("{queryId}/{slug}/{token}")@Produces(MediaType.APPLICATION_JSON)public void getQueryResults(@PathParam("queryId") QueryId queryId,@PathParam("slug") String slug,@PathParam("token") long token,@QueryParam("maxWait") Duration maxWait,@QueryParam("targetResultSize") DataSize targetResultSize,@HeaderParam(X_FORWARDED_PROTO) String proto,@Context UriInfo uriInfo,@Suspended AsyncResponse asyncResponse){Query query = getQuery(queryId, slug, token);if (isNullOrEmpty(proto)) {proto = uriInfo.getRequestUri().getScheme();}DataSize targetResultSizeToUse = Optional.ofNullable(targetResultSize).map(size -> Ordering.natural().min(size, MAX_TARGET_RESULT_SIZE)).orElse(defaultTargetResultSize);asyncQueryResults(query, token, maxWait, targetResultSizeToUse, uriInfo, proto, asyncResponse);}

通过上面的Query.waitForDispatched方法进入到了Dispatcher的createQuery方法,具体的逻辑在通过异步线程丢在createQueryInternal方法里面了,createQueryInternal的逻辑包括了词法分析、语法解析,语义分析,选择resource group,生成执行计划等等(当然具体的实现还是在各个类里面,只是类似于面向过程语言一样串起来了)。

/***  Creates and registers a dispatch query with the query tracker.  This method will never fail to register a query with the query*  tracker.  If an error occurs while creating a dispatch query, a failed dispatch will be created and registered.*/private <C> void createQueryInternal(QueryId queryId, Slug slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager){Session session = null;PreparedQuery preparedQuery = null;try {if (query.length() > maxQueryLength) {int queryLength = query.length();query = query.substring(0, maxQueryLength);throw new PrestoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));}// decode sessionsession = sessionSupplier.createSession(queryId, sessionContext);// prepare querypreparedQuery = queryPreparer.prepareQuery(session, query);// select resource groupOptional<String> queryType = getQueryType(preparedQuery.getStatement().getClass()).map(Enum::name);SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(sessionContext.getIdentity().getPrincipal().isPresent(),sessionContext.getIdentity().getUser(),Optional.ofNullable(sessionContext.getSource()),sessionContext.getClientTags(),sessionContext.getResourceEstimates(),queryType));// apply system default session properties (does not override user set properties)session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, queryType, selectionContext.getResourceGroupId());// mark existing transaction as activetransactionManager.activateTransaction(session, isTransactionControlStatement(preparedQuery.getStatement()), accessControl);DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(session,query,preparedQuery,slug,selectionContext.getResourceGroupId());boolean queryAdded = queryCreated(dispatchQuery);if (queryAdded && !dispatchQuery.isDone()) {submitQuerySync(dispatchQuery, selectionContext);}}catch (Throwable throwable) {// creation must never fail, so register a failed query in this caseif (session == null) {session = Session.builder(new SessionPropertyManager()).setQueryId(queryId).setIdentity(sessionContext.getIdentity()).setSource(sessionContext.getSource()).build();}Optional<String> preparedSql = Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, preparedSql, Optional.empty(), throwable);queryCreated(failedDispatchQuery);}}

可以看到createQueryInternal方法的开始对SQL的长度有一定的限制,接下来便是通过前面得到的SessionContext和QueryId构造Session对象,Session对象包括queryId,catelog,schema,SystemProperties,ConnectorProperties,ClientTags等等。

public final class Session
{private final QueryId queryId;private final Optional<TransactionId> transactionId;private final boolean clientTransactionSupport;private final Identity identity;private final Optional<String> source;private final Optional<String> catalog;private final Optional<String> schema;private final SqlPath path;private final TimeZoneKey timeZoneKey;private final Locale locale;private final Optional<String> remoteUserAddress;private final Optional<String> userAgent;private final Optional<String> clientInfo;private final Optional<String> traceToken;private final Optional<String> labelInfo;private Set<String> clientTags;private final Set<String> clientCapabilities;private final ResourceEstimates resourceEstimates;private final long startTime;private Map<String, String> systemProperties;private Map<CatalogName, Map<String, String>> connectorProperties;private final Map<String, Map<String, String>> unprocessedCatalogProperties;private final SessionPropertyManager sessionPropertyManager;private final Map<String, String> preparedStatements;
}

再下来就是将SQL解析成ATS的过程了,此处表面只有一行代码,但是包括了词法分析和语法分析,生成AST树这么几个关键过程。返回的PreparedQuery里面是一个Statement对象(解析之后的AST树),不同类型的SQL对应了不同的Statement的子类,如select查询是Query、create table是CreateTable等等,后续的语义分析都将基于这个AST树进行visit。

// prepare querypreparedQuery = queryPreparer.prepareQuery(session, query);

SQL传过来时候通过反射调用Paser进行解析

private Node invokeParser(String name, String sql, Function<SqlBaseParser, ParserRuleContext> parseFunction, ParsingOptions parsingOptions){try {SqlBaseLexer lexer = new SqlBaseLexer(new CaseInsensitiveStream(CharStreams.fromString(sql)));CommonTokenStream tokenStream = new CommonTokenStream(lexer);SqlBaseParser parser = new SqlBaseParser(tokenStream);// Override the default error strategy to not attempt inserting or deleting a token.// Otherwise, it messes up error reportingparser.setErrorHandler(new DefaultErrorStrategy(){@Overridepublic Token recoverInline(Parser recognizer)throws RecognitionException{if (nextTokensContext == null) {throw new InputMismatchException(recognizer);}else {throw new InputMismatchException(recognizer, nextTokensState, nextTokensContext);}}});parser.addParseListener(new PostProcessor(Arrays.asList(parser.getRuleNames()), parser));lexer.removeErrorListeners();lexer.addErrorListener(LEXER_ERROR_LISTENER);parser.removeErrorListeners();if (enhancedErrorHandlerEnabled) {parser.addErrorListener(PARSER_ERROR_HANDLER);}else {parser.addErrorListener(LEXER_ERROR_LISTENER);}ParserRuleContext tree;try {// first, try parsing with potentially faster SLL modeparser.getInterpreter().setPredictionMode(PredictionMode.SLL);tree = parseFunction.apply(parser);}catch (ParseCancellationException ex) {// if we fail, parse with LL modetokenStream.seek(0); // rewind input streamparser.reset();parser.getInterpreter().setPredictionMode(PredictionMode.LL);tree = parseFunction.apply(parser);}return new AstBuilder(parsingOptions).visit(tree);}catch (StackOverflowError e) {throw new ParsingException(name + " is too large (stack overflow while parsing)");}}

通过Antlr进行词法分析,将一个个字符转化为一个个token

SqlBaseLexer lexer = new SqlBaseLexer(new CaseInsensitiveStream(CharStreams.fromString(sql)));CommonTokenStream tokenStream = new CommonTokenStream(lexer);

利用词法分析后的tokenStream构造SqlParser对象,addParseListener和addErrorListener增加解析过程中的事件监听器。然后设置ATN(Anltr中的一种图数据机构,表示语法的状态机) interpreter的PredictionMode,PredictionMode分别有SLL和LL(上下文无关文法的自顶向下分析器。从左到右处理输入,再对句型执行最左推导出语法树)(笔者对这两者的了解有限,有兴趣的读者可以自己了解一下),从代码中可以看到SLL模式解析失败后会立刻切换到LL进行解析。

try {// first, try parsing with potentially faster SLL modeparser.getInterpreter().setPredictionMode(PredictionMode.SLL);tree = parseFunction.apply(parser);}catch (ParseCancellationException ex) {// if we fail, parse with LL modetokenStream.seek(0); // rewind input streamparser.reset();parser.getInterpreter().setPredictionMode(PredictionMode.LL);tree = parseFunction.apply(parser);}

解析时候会得到一个antlr的语法树ParserRuleContext(Antlr中SyntaxTree的子类),对应的解析流程则是和SqlBase.g4文件中定义的词法语法规则一样的(强烈建议学习一下Antlr,对于学习SQL解析类的源码很有帮助),我们可以写一条SQL用Antlr插件来看一下Antlr生成之后的语法树:

SELECT A FROM TEST_TABLE WHERE A = "abc" GROUP BY A

在Anltr解析之后将解析之后的ParserTree 交给AstBuilder进行visit(Presto中大量使用到访问者模式),接下来就是遍历Antlr的语法树,得到Presto中定义的,比较容易看懂和操作的AST树,遍历方式是访问者模式,采用自顶向下的方式递归访问各个子节点。逻辑简图大概是下面这样的:

AstBuilder实现了antlr4 中的SqlBaseBaseVisitor,重写了其中的visitXXX方法,如下:


具体AstBuilder是如何通过访问者模式遍历每个Node的建议读者自己通过源码debug一下,说实话文字描述并不能很好的体现出来(笔者能力有限也是一部分原因),自己debug才能印象深刻。

到此,从SQL到AST树的过程便讲完了,接下来便是语义分析,生成执行假话,执行计划调度等过程,后续有时间再写了。以上有不对的地方欢迎大家批评指正。

更多推荐

Presto源码解析—从SQL到AST抽象语法树

本文发布于:2024-02-25 02:31:20,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1697483.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:抽象   语法   源码   Presto   AST

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!