Impala JDBC 动态设置查询选项

5 分钟阅读

前篇文章 Impala 设置查询选项 介绍常用设置查询选项的方法,本文详细介绍实现动态查询选项过程。

AOP 概念

在我们开始之前,让我们对术语和核心概念做一个回顾:

  • Aspect:切面,由切点和增强相结合而成,定义增强应用到哪些切点上。即一个横跨多个核心逻辑的功能,或者称之为系统关注点。
  • Joinpoint:连接点,这是程序执行过程中的一个特殊点,例如方法执行,构造函数调用或字段分配。
  • Pointcut:切入点,一个匹配连接点的正则表达式。 每当任何连接点匹配一个切入点时,就执行与该切入点相关联的指定增强。
  • Advice:增强,指特定连接点上执行的动作,实际中想要添加的功能,如日志、权限校验。有5种:@Before @After @AfterReturning @AfterThrowing @Around
  • Weaving:织入,即对方法的增强,将切面的代码织入(应用)到目标函数的过程。

Spring AOP 和 AspectJ

下表总结了 Spring AOP 和 AspectJ 之间的关键区别:

Spring AOP AspectJ
在纯 Java 中实现 使用 Java 编程语言的扩展实现
不需要单独的编译过程 除非设置 LTW,否则需要 AspectJ 编译器 (ajc)
只能使用运行时织入 运行时织入不可用。支持编译时、编译后和加载时织入
功能不强-仅支持方法级编织 更强大 - 可以编织字段、方法、构造函数、静态初始值设定项、最终类/方法等……。
只能在由 Spring 容器管理的 bean 上实现 可以在所有域对象上实现
仅支持方法执行切入点 支持所有切入点
代理是由目标对象创建的, 并且切面应用在这些代理上 在执行应用程序之前 (在运行时) 前, 各方面直接在代码中进行织入
比 AspectJ 慢多了 更好的性能
易于学习和应用 相对于 Spring AOP 来说更复杂

选择正确的框架

选择那个框架很大程度上取决于我们的要求:

  • 框架: 如果应用程序没有使用 Spring 框架, 那么我们就别无选择, 只能放弃使用 Spring AOP 的想法, 因为它无法管理任何超出 Spring 容器范围的东西。但是, 如果我们的应用程序是完全使用 Spring 框架创建的, 那么我们可以使用 Spring AOP, 因为它是简单的学习和应用。
  • 灵活性: 由于有限的 joinpoint 支持, Spring AOP 不是一个完整的 AOP 解决方案, 但它解决了程序员面临的最常见的问题。尽管如果我们想深入挖掘和开发 AOP 以达到其最大能力, 并希望得到广泛的可用 joinpoints 的支持, 那么最好选择 AspectJ。
  • 性能: 如果我们使用的是有限的切面, 那么就会有细微的性能差异。但有时, 应用程序有成千上万个切面的情况。我们不想在这样的情况下使用运行时编织, 所以最好选择 AspectJ。AspectJ 已知的速度比 Spring AOP 快8到35倍。
  • 两者的最佳之处: 这两个框架都是完全兼容的。我们总是可以利用 Spring AOP;只要有可能, 仍然可以在不支持前者的地方使用 AspectJ 获得支持。

基于我们的需求:对 Impala jdbc 驱动包的内容做增强。不受 Spring 管理,AspectJ 支持这种方式,并提供了 Compile-time weavingPost-compile weavingLoad-time weaving 三种织入方式,从开发部署便捷性上这里选择 Post-compile weaving 方式。

环境

Spring Boot: 2.1.4.RELEASE

Impala JDBC: 2.6.17.1020

步骤:

步骤中省略了数据源的配置过程。驱动可以开启日志,有利于我们跟踪分析请求。

1
 jdbc:impala://localhost:21050;LogLevel=6;LogPath=/tmp/impala

1. 添加 aspects 支持

首先,我们通过 Maven 引入Spring 对 aspects 的支持:

1
2
3
4
5
6
7
8
9
10
11
    <!-- aspects -->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-aspects</artifactId>
    </dependency>
    <dependency>
      <groupId>org.aspectj</groupId>
      <artifactId>aspectjrt</artifactId>
      <version>${aspectj.version}</version>
      <scope>compile</scope>
    </dependency>

上述依赖会自动引入 AspectJ,使用 AspectJ 实现 AOP 比较方便。

2. 定义切面类

然后,我们定义一个HS2ClientAspect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.cloudera.example.aspects;

import com.cloudera.example.helper.HS2ClientHelper;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

/**
 * HS2ClientWrapper 类的切面。
 * <p>
 */
@Aspect
public class HS2ClientAspect {

  private final Logger log = LoggerFactory.getLogger(this.getClass());


  /**
   * 切入点,一个匹配连接点的正则表达式。
   */
  @Pointcut("execution(public * com.cloudera.impala.hivecommon.api.HS2ClientWrapper" +
      ".ExecuteStatement(..))")
  public void hs2Pointcut() {
    // Method is empty as this is just a Pointcut, the implementations are in the advices.
  }


  /**
   * 前置增强,目标方法执行前之前执行。
   *
   * @param joinPoint 连接点
   */
  @Before("hs2Pointcut()")
  public void optimizeQueryOption(JoinPoint joinPoint) {
    if (log.isDebugEnabled()) {
      log.debug("Enter: {}.{}() with argument[s] = {}",
          joinPoint.getSignature().getDeclaringTypeName(),
          joinPoint.getSignature().getName(), Arrays.toString(joinPoint.getArgs()));
    }
    // 执行前,设置查询选项
    HS2ClientHelper.getInstance().addQueryOption(joinPoint.getSignature().getName(),
        joinPoint.getArgs());
  }
}

观察hs2Pointcut() 方法,定义了一个@Pointcut注解,后面的字符串是告诉 AspectJ 应该如何匹配一个切入点,注意这步可以省略,可以字符串直接写在Advice上,如果只有一个 Advice 可以使用这种方法。

1
2
3
4
5
6
  @Before("execution(public * com.cloudera.impala.hivecommon.api.HS2ClientWrapper" +
      ".ExecuteStatement(..))")
  public void optimizeQueryOption(JoinPoint joinPoint) {
	// ....
  }

观察optimizeQueryOption()方法,我们定义了一个@Before注解,后面的字符串(这里也可以直接写匹配表达式)是告诉 AspectJ 应该在何处执行该方法,这里的意思是:执行HS2ClientWrapperExecuteStatement方法前执行optimizeQueryOption()代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  /**
   * 添加优化查询选项参数
   *
   * @param req ExecuteStatementReq
   */
  public void addOptimizeQueryOption(TExecuteStatementReq req) {
    // 1. 获得请求的查询配置属性
    Map<String, String> confOverlay = req.getConfOverlay();
    if (null == confOverlay) {
      confOverlay = Maps.newHashMap();
      req.setConfOverlay(confOverlay);
    }
    // 2. 获得sql语句的唯一标识
    byte[] guid = req.getSessionHandle().getSessionId().getGuid();
    String sessionId = StringHelper.getGuid(guid);
    // a. 从 sql 语句提取 traceId(可能取不到,驱动会对 sql 自动翻译)
    String statement = req.getStatement();
    String traceId = SQLHelper.getTraceIdBySql(statement);
    // b. 获取当前 session 的优化参数 Map<sessionId, traceId>
    if (StringUtils.isBlank(traceId)) {
      traceId = sessionId;
    }
    // 3. 获取查询选项并配置
    if (StringUtils.isNotBlank(traceId)) {
      Map<String, String> queryOptions = SQLHelper.getConfByTraceId(traceId);
      if (null != queryOptions) {
        confOverlay.putAll(queryOptions);
      }
    }
    log.info("Optimize query option: sessionid={}, statement={}, confOverlay={}", sessionId,
        statement, confOverlay);
  }

3. 添加 aspects 编译插件

紧接着,添加 aspectj 插件用于织入代码。aspectj 插件是支持 jar 和 .class 做织入的。这里未使用的原因是驱动包太复杂织入失败,实际上我们仅需要对一类做织入 ,通过maven-dependency-plugin插件配合自动完成编译后织入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
<!-- Unzip the classes to be woven from the JAR and do so before compiling -->
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-dependency-plugin</artifactId>
    <executions>
        <execution>
            <id>unpack</id>
            <phase>generate-sources</phase>
            <goals>
                <goal>unpack</goal>
            </goals>
            <configuration>
                <artifactItems>
                    <artifactItem>
                        <groupId>Impala</groupId>
                        <artifactId>ImpalaJDBC${jdbc.version}</artifactId>
                        <version>${impala.jdbc.version}</version>
                        <type>jar</type>
                        <!-- 要织入的类 -->
                        <includes>com/cloudera/impala/hivecommon/api/HS2ClientWrapper*.*</includes>
                        <outputDirectory>${project.build.directory}/unwoven-classes</outputDirectory>
                    </artifactItem>
                </artifactItems>
            </configuration>
        </execution>
    </executions>
</plugin>
<plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>aspectj-maven-plugin</artifactId>
    <version>1.11</version>
    <configuration>
        <complianceLevel>1.8</complianceLevel>
        <verbose>true</verbose>
        <!-- Weaving already compiled classes -->
        <weaveDirectories>
            <weaveDirectory>${project.build.directory}/unwoven-classes</weaveDirectory>
        </weaveDirectories>
        <aspectLibraries>
            <aspectLibrary>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aspects</artifactId>
            </aspectLibrary>
        </aspectLibraries>
    </configuration>
    <dependencies>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjtools</artifactId>
            <version>${aspectj.version}</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>${aspectj.version}</version>
        </dependency>
    </dependencies>
    <executions>
        <execution>
            <!-- Compile and weave aspects after all classes compiled by javac -->
            <!--                        <phase>process-classes</phase>-->
            <goals>
                <goal>compile</goal>
                <goal>test-compile</goal>
            </goals>
        </execution>
    </executions>
</plugin>
</plugins>

执行编译:mvn clean compile 或者点击 IDE 中 maven 插件的 compole

编译后classes应该有这个类,这是织入之后的类,

aspectj-pcw-impala-jdbc-hs2client

反编译后找到ExecuteStatement方法,可以看到前两行是织入的代码。

1
2
3
4
5
6
7
8
9
10
11
12
  public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq arg0) throws TException {
    JoinPoint var3 = Factory.makeJP(ajc$tjp_0, this, this, arg0);
    HS2ClientAspect.aspectOf().optimizeQueryOption(var3);
    LogUtilities.logFunctionEntrance(this.m_logger, new Object[]{arg0});
    HS2ClientWrapper.TCLIFunction var2 = new HS2ClientWrapper.TCLIFunction() {
      public TExecuteStatementResp clientCall(Object var1, HS2ClientWrapper var2) throws TException {
        TExecuteStatementResp var3 = var2.callExecuteStatement((TExecuteStatementReq)var1);
        return var3;
      }
    };
    return (TExecuteStatementResp)this.executeWithRetry(var2, arg0, this);
  }

4. 测试

最后测试类,可以通过发送set语句验证是否生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.cloudera.example.mapper;


import static org.junit.Assert.assertEquals;

import com.cloudera.example.ImpalaJdbcExampleApplicationTests;
import com.cloudera.example.helper.SQLHelper;
import com.google.common.collect.Maps;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

public class ImpalaQueryOptionsTest extends ImpalaJdbcExampleApplicationTests {

  @Autowired
  DataSource dataSource;

  @Test
  public void testQueryOptions() {
    // 1. 先把查询选项放到缓存。
    // 2. 发送 set 请求,获得当前查询选项
    // 3. aspectj 拦截 ExecuteStatement 方法设置查询选项参数 confOverlay
    // 4. 验证结果,从中取出设置的查询选项做判断

    // 1. 设置查询选项
    Map<String, String> queryOptions = Maps.newHashMap();
    queryOptions.put("request_pool", "mypool");
    queryOptions.put("mt_dop", "2");

    String sqlStatement = "/* id:1 */ set";
    String sessionId = null;
    Connection con = null;
    try {
      con = dataSource.getConnection();

      // 设置查询选项
      SQLHelper.setSqlConf("1", queryOptions);
      // 当前session也设置一下
      sessionId = SQLHelper.getSessionId(con);
      if (StringUtils.isNotBlank(sessionId)) {
        SQLHelper.setSqlConf(sessionId, queryOptions);
      }

      //  2. 发送查询请求
      Statement stmt = con.createStatement();
      ResultSet rs = stmt.executeQuery(sqlStatement);

      System.out.println("\n== Begin Query Results ======================");
      Map<String, String> result = Maps.newHashMap();
      // print the results to the console
      while (rs.next()) {
        String option = rs.getString(1);
        String value = rs.getString(2);
        result.put(option, value);
      }
      System.out.println("result: " + result);
      System.out.println("== End Query Results =======================\n\n");
      
      // 4. 验证,从返回结果里获取配置的查询选项并比较
      queryOptions.forEach((k, v) -> {
        String actual = result.get(k.toUpperCase());
        assertEquals("failure - strings are not equal", v, actual);
      });
    } catch (SQLException e) {
      e.printStackTrace();
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      SQLHelper.clearByTraceId(sessionId);
      try {
        con.close();
      } catch (Exception e) {
        // swallow
      }
    }
  }

}

日志里会有这条记录,如果没有请检查是否漏掉了编译,IDE 不会使用我们配置的插件编译。

//tmp/impala/Impala_connection_0.log:

1
TRACE 1 com.cloudera.impala.hivecommon.api.HS2ClientWrapper.ExecuteStatement(TExecuteStatementReq(sessionHandle:TSessionHandle(sessionId:THandleIdentifier(guid:DC 4C D9 5A 49 81 45 4D A8 0D 7C D9 11 3B E7 3A, secret:0E DD 99 AE FF 4E 42 60 96 27 9D 10 A1 20 B2 91)), statement:set, confOverlay:{request_pool=mypool, mt_dop=2}, runAsync:true)): +++++ enter +++++

注意看confOverlay的值。

小结

本文先介绍了 AOP 及 Aspect 相关知识,接着介绍动态查询选项的工程实现。

参考:

留下评论