当我们在终端中输入hive时,会执行位于$HIVE_HOME/bin/hive的一个脚本,这个脚本又会执行bin下面的ext目录中的每个sh脚本。在ext目录下面包含了很多脚本,用于启动各种Hive所依赖的服务。

Liangs-MacBook-Pro:ext pwrliang$ ll|grep .sh
-rwxr-xr-x  1 pwrliang  staff  1679 Jan 26 16:13 beeline.sh
-rwxr-xr-x  1 pwrliang  staff  1028 Jan 26 16:13 cli.sh
-rwxr-xr-x  1 pwrliang  staff  3199 Jan 26 16:13 debug.sh
-rwxr-xr-x  1 pwrliang  staff  1456 Jan 26 16:13 help.sh
-rwxr-xr-x  1 pwrliang  staff  1187 Jan 26 16:13 hiveburninclient.sh
-rwxr-xr-x  1 pwrliang  staff  1214 Jan 26 16:13 hiveserver.sh
-rwxr-xr-x  1 pwrliang  staff  1118 Jan 26 16:13 hiveserver2.sh
-rwxr-xr-x  1 pwrliang  staff  1625 Jan 26 16:13 hwi.sh
-rwxr-xr-x  1 pwrliang  staff  1424 Jan 26 16:13 jar.sh
-rwxr-xr-x  1 pwrliang  staff  1220 Jan 26 16:13 lineage.sh
-rwxr-xr-x  1 pwrliang  staff  1271 Jan 26 16:13 metastore.sh
-rwxr-xr-x  1 pwrliang  staff  1101 Jan 26 16:13 metatool.sh
-rwxr-xr-x  1 pwrliang  staff  1073 Jan 26 16:13 orcfiledump.sh
-rwxr-xr-x  1 pwrliang  staff  1059 Jan 26 16:13 rcfilecat.sh
-rwxr-xr-x  1 pwrliang  staff  1080 Jan 26 16:13 schemaTool.sh
-rwxr-xr-x  1 pwrliang  staff  1266 Jan 26 16:13 version.sh

这些脚本中cli.sh是我们关注的入口,中cli.sh中会调用hadoop jar命令,启动”org.apache.hadoop.hive.cli.CliDriver”。

CliDriver的第620行是main函数,在这里实例化CliDriver并调用run方法。

  public static void main(String[] args) throws Exception {
    int ret = new CliDriver().run(args);
    System.exit(ret);
  }

在CliDriver的构造函数中,会获得SessionState实例。SessionState封装了与会话相关的公共数据。SessionState同样为线程静态会话对象提供支持,这样就可以在代码的任何地方访问Session,并且能够获得配置信息。

  public CliDriver() {
    SessionState ss = SessionState.get();
    conf = (ss != null) ? ss.getConf() : new Configuration();
    Log LOG = LogFactory.getLog("CliDriver");
    if (LOG.isDebugEnabled()) {
      LOG.debug("CliDriver inited with classpath " + System.getProperty("java.class.path"));
    }
    console = new LogHelper(LOG);
  }

每个线程都与SessionStates相关联,SessionStates包含SessionState与HiveConf实例。

  // SessionState is not available in runtime and Hive.get().getConf() is not safe to call
  private static class SessionStates {
    private SessionState state;
    private HiveConf conf;
    private void attach(SessionState state) {
      this.state = state;
      attach(state.getConf());
    }
    private void attach(HiveConf conf) {
      this.conf = conf;
      ClassLoader classLoader = conf.getClassLoader();
      if (classLoader != null) {
        Thread.currentThread().setContextClassLoader(classLoader);
      }
    }
  }

SessionStates实例化后,state与conf变量都为空。当CliDriver调用ss.get方法获取一个与线程关联的SessionStates,会得到空的state变量。CliDriver发现state为空,会创建Configuration实例。

当CliDriver实例化完成后,会执行run方法。在run方法中实例化OptionsProcessor、初始化Log4j、创建CliSessionState。其中CliSessionState继承自SessionState,CliSessionState添加了database、execString、filename等属性。

public class CliSessionState extends SessionState {
  /**
   * -database option if any that the session has been invoked with.
   */
  public String database;

  /**
   * -e option if any that the session has been invoked with.
   */
  public String execString;

  /**
   * -f option if any that the session has been invoked with.
   */
  public String fileName;

  /**
   * properties set from -hiveconf via cmdline.
   */
  public Properties cmdProperties = new Properties();

  /**
   * -i option if any that the session has been invoked with.
   */
  public List<String> initFiles = new ArrayList<String>();

  public CliSessionState(HiveConf conf) {
    super(conf);
  }
....
}

在创建CliSessionState时,需要传递HiveConf实例。HiveConf继承自hadoop的Configuration类。HiveConf的构造函数需要SessionState.class来获取hiveJar路径(“apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar”)。

当CliSessionState实例化完成后,将该实例传递给SessionState.start来启动session。Session启动时,首先将CliSessionState关联到该线程当SessionStates当state变量上. 然后调用getMSC获取SessionHiveMetaStoreClient实例。IMetaStoreClient接口包含获取数据库名、表名、分区信息等获取元数据等方法.

public static SessionState start(SessionState startSs) {
    setCurrentSessionState(startSs); // 将与线程关联的SessionStates的state变量赋予CliSessionState实例,由SessionStates.attach方法实现.

    ....

    // Get the following out of the way when you start the session these take a
    // while and should be done when we start up.
    try {
      // Hive object instance should be created with a copy of the conf object. If the conf is
      // shared with SessionState, other parts of the code might update the config, but
      // Hive.get(HiveConf) would not recognize the case when it needs refreshing
      Hive.get(new HiveConf(startSs.conf)).getMSC();
      UserGroupInformation sessionUGI = Utils.getUGI();
      FileSystem.get(startSs.conf);

      // Create scratch dirs for this session
      startSs.createSessionDirs(sessionUGI.getShortUserName());

    ....
    return startSs;
  }

当Session启动后,CliDriver的executeDriver方法会被调用. 在这个方法中,还会初始化另一个CliDriver实例,接下来获取用户输入的SQL语句,将SQL语句传递给CliDriver的processLine处理.

processLine方法将会把用户输入的SQL按照分号”;”拆分,每个SQL称为oneCmd. oneCmd可能是被‘\’修饰的换行SQL中其中的一行,所以需要拼接. 拼接的完整SQL存入command变量中,交给processCmd方法处理.

      for (String oneCmd : line.split(";")) {

        if (StringUtils.endsWith(oneCmd, "\\")) {
          command += StringUtils.chop(oneCmd) + ";";
          continue;
        } else {
          command += oneCmd;
        }
        if (StringUtils.isBlank(command)) {
          continue;
        }

        ret = processCmd(command);
        //wipe cli query state
        SessionState ss = SessionState.get();
        ss.setCommandType(null);
        command = "";
        lastRet = ret;
        boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
        if (ret != 0 &amp;&amp; !ignoreErrors) {
          CommandProcessorFactory.clean((HiveConf) conf);
          return ret;
        }
      }

在processCmd方法中,获取CliSessionState实例,将即将被执行的command存入Session的lastCommand变量, 以保存上一条执行的SQL. command不光是SQL,也可能是exit、quit等命令, 还可能是一条由!开头的shell命令, 在这里我们只关注SQL.

public int processCmd(String cmd) {
    CliSessionState ss = (CliSessionState) SessionState.get();
    ss.setLastCommand(cmd);
    // Flush the print stream, so it doesn't include output from the last command
    ss.err.flush();
    String cmd_trimmed = cmd.trim();
    String[] tokens = tokenizeCmd(cmd_trimmed);
    if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
    ....
    } else if (tokens[0].equalsIgnoreCase("source")) {
    ....
    } else if (cmd_trimmed.startsWith("!")) {
    ....
    }  else { // local mode
      try {
        CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf); //此处会初始化Driver实例
        ret = processLocalCmd(cmd, proc, ss);
      } catch (SQLException e) {
        console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
          org.apache.hadoop.util.StringUtils.stringifyException(e));
        ret = 1;
      }
    }

    return ret;
}

SQL将会继续交给processLocalCmd方法处理, processLocalCmd需要待执行的SQL-cmd、CommandProcessor – proc与CliSessionState – ss. 如果proc是Driver的实例, 那么就会把待执行的SQL交给org.apache.hadoop.hive.ql.Driver的run方法执行.

int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
    int tryCount = 0;
    boolean needRetry;
    int ret = 0;

    do {
      try {
        needRetry = false;
        if (proc != null) {
          if (proc instanceof Driver) {
            // 该分支处理SQL
            Driver qp = (Driver) proc;
            PrintStream out = ss.out;
            long start = System.currentTimeMillis();
            if (ss.getIsVerbose()) {
              out.println(cmd);
            }
            qp.setTryCount(tryCount);
            ret = qp.run(cmd).getResponseCode();
            if (ret != 0) {
              qp.close();
              return ret;
            }
            ....
          } else {
            // 该分支处理Set等内置命令
            ....
          }
        }
      } catch (CommandNeedRetryException e) {
        console.printInfo("Retry query with a different approach...");
        tryCount++;
        needRetry = true;
      }
    } while (needRetry);

    return ret;
  }

下图显示出了从CliDriver.main开始到Driver.run的调用栈.

图1 调用栈

本文分析了从键入hive命令开始,到接受待执行到SQL语句的过程. SQL语句的实际执行交由Driver的run方法负责, 下一章我们从分析Driver类开始.

pwrliang Hive

Leave a Reply

Your email address will not be published. Required fields are marked *