HyperSQL调研学习文档(三)

2022-04-08 00:00:00 语句 执行 事务 调用 返回

近调研HyperSQL,把整理的资料记录一下,并分享给大家,由于时间略紧,内容肯定有遗漏和谬误的地方,欢迎大家指正。本人也会持续的修改更新。

4. 数据库启动与建立连接流程
4.1 Server启动流程
当我们使用:



Server server = new Server();
server.setPort(8743);
server.setDatabaseName(0, "test");
server.setDatabasePath(0, "mem:test;sql.enforce_strict_size=true;sql.restrict_exec=true");
server.setLogWriter(null);
server.setErrWriter(null);
server.start();
来启动一个数据库Server端时,应用创建一个线程并启动:



serverThread = new ServerThread("HSQLDB Server ");
serverThread.start();
该ServerThread是Server的内部类,其run方法如下:



public void run() {
Server.this.run();
printWithThread("ServerThread.run() exited");
}
调用所在的Server实例的run方法:



private void run() {


StopWatch sw;
ThreadGroup tg;
String tgName;


printWithThread("run() entered");
print("Initiating startup sequence...");
printProperties();


sw = new StopWatch();


setServerError(null);


try {


// Faster init first:
// It is huge waste to fully open the databases, only
// to find that the socket address is already in use
openServerSocket();
} catch (Exception e) {
setServerError(e);
printError("run()/openServerSocket(): ");
printStackTrace(e);
shutdown(true);


return;
}


tgName = "HSQLDB Connections @"
+ Integer.toString(this.hashCode(), 16);
tg = new ThreadGroup(tgName);


tg.setDaemon(false);


serverConnectionThreadGroup = tg;


// Mount the databases this server is supposed to host.
// This may take some time if the databases are not all
// already open.
if (!openDatabases()) {
setServerError(null);
printError("Shutting down because there are no open databases");
shutdown(true);


return;
}


// At this point, we have a valid server socket and
// a valid hosted database set, so its OK to start
// listening for connections.
setState(ServerConstants.SERVER_STATE_ONLINE);
print(sw.elapsedTimeToMessage("Startup sequence completed"));
printServerOnlineMessage();


// isShuttingDown is only read after socket connections are 'accept'ed.
isShuttingDown = false; // In case shutdown was aborted previously.


try {
/*
* This loop is necessary for UNIX w/ Sun Java 1.3 because
* in that case the socket.close() elsewhere will not
* interrupt this accept().
*/
while (socket != null) {
try {
handleConnection(socket.accept());
} catch (java.io.InterruptedIOException iioe) {}
}
} catch (IOException ioe) {
if (getState() == ServerConstants.SERVER_STATE_ONLINE) {
setServerError(ioe);
printError(this + ".run()/handleConnection(): ");
printStackTrace(ioe);
}
} catch (Throwable t) {
printWithThread(t.toString());
} finally {
shutdown(false); // or maybe getServerError() != null?
}
}
1. 其中,首先调用openServerSocket()来查看socket地址是否已经被占用(避免费劲整个启动后才发现端口被占)。若被占就早早报异常;若未被占就启动ServerSocket并监听该端口。



首先初始化一个HsqlSocketFactory
然后获得配置的address和port
然后使用socketFactory.createServerSocket(port, address)来创建一个ServerSocket
2. 若地址未被占用,则ServerSocket创建成功,接下来执行OpenDatabase()过程
a 首先执行setDBInfoArrays()
该方法的作用是获得配置的所有dbNames的数组,然后对每一个dbName从serverProperties获得针对该dbName的配置,将其保存在Server.dbAlias, Server.dbType, Server.dbPath, Server.props等数组中。









b 然后对每一个Server.dbAlias中定义的dbName,委托DatabaseManager.getDatabase(dbType, dbPath, this, dbProps)来获得对应的数据库对象:

DatabaseManager使用一个HashMap来缓存Database,若对于key值(也就是dbPath[i])存在缓存的db,则直接返回。否则首先调用getDatabase(type, path, props)创建一个db,其中通过new Database(type, path, dbName, props)创建一个database并缓存在DatabaseManager中。
然后判断database的状态,若为ONLINE则直接返回database,若为SHUTDOWN则首先执行database.open()将database打开,然后再返回database。database.open()函数首先判断该库是否处于关闭(SHUTDOWN)状态,若是则执行其reopen函数,该函数是database打开的核心函数,其业务逻辑也比较复杂,我们将在下面专门介绍。
3. 所有database开启完毕后,通过:



while (socket != null) {
try {
handleConnection(socket.accept());
} catch (java.io.InterruptedIOException iioe) {}
}
循环来持续监听指定端口上的socket连接,并对其进行处理,方法如下:



public void handleConnection(Socket s) {
Thread t;
Runnable r;
String ctn;




printWithThread("handleConnection(" + s + ") entered");




if (!allowConnection(s)) {
try {
s.close();
} catch (Exception e) {}




printWithThread("allowConnection(): connection refused");
printWithThread("handleConnection() exited");




return;
}




// Maybe set up socket options, SSL
// Session tracing/callbacks, etc.
if (socketFactory != null) {
socketFactory.configureSocket(s);
}




if (serverProtocol == ServerConstants.SC_PROTOCOL_HSQL) {
r = new ServerConnection(s, this);
ctn = ((ServerConnection) r).getConnectionThreadName();
} else {
r = new WebServerConnection(s, (WebServer) this);
ctn = ((WebServerConnection) r).getConnectionThreadName();
}




t = new Thread(serverConnectionThreadGroup, r, ctn);




t.start();
printWithThread("handleConnection() exited");
}
由上可见,对于每一个新到来的socket连接,Server都会视其连接协议新建一个ServerConnection或WebServerConnection线程来提供服务。
至此,服务端启动过程就完毕了,至于ServerConnection或WebServerConnection线程对各种不同的请求是如何提供服务响应的,我们在后面逐个解释。



4.1.1 Database的reopen过程

上面提到了,在打开一个数据库的时候,调用了database.reopen()函数,该函数的总体执行逻辑如下:
1. 首先创建数据库的核心模块结构createObjectStructures()
2. 接着打开该库的database文件并启动日志进程logger.open()
3. 如果是新建数据库,创建默认用户SA和默认Schema: PUBLIC
4. 然后打开大字段处理器lobManager.open()
5. 后初始化CheckPointRunner和TimeoutRunner
下面我们逐个讲解具体步骤。



4.1.1.1 createObjectStructures

本方法负责创建及初始化数据库的各个核心模块,各个模块功能及其初始化执行逻辑概述如下:



PersistentStoreCollectionDatabase负责管理针对该database的持久化存储,其持有PersistentStore的缓存LongkeyHashMap,并委托database.logger来创建新的PersistentStore
HsqlNameManager提供了对于SQL对象的名称管理
LobManager大字段管理器,其中执行resources/lob-schema.sql脚本创建用以管理大字段的schema,表,索引及存储过程。
GranteeManager角色管理器(一个Grantee对象代表一个role或一个user)。其中创建了"PUBLIC", "DBA", "CREATE_SCHEMA", "CHANGE_AUTHORITATION"等角色。
UserManager用户管理器,与该database对应的GranteeManager联系起来,并创建特殊的用户PUBLIC_USER_NAME和SYSTEM_AUTHORIZATION_NAME。其中SYSTEM_AUTHORIZATION_NAME是在GranteeManager中创建的,并在其中维护。
SchemaManager,管理所有Schema相关的数据库对象,并默认创建出PUBLIC及SYSTEM_AUTHORIZATION_NAME的Schema。SchemaObject对象包括:Table, View, Sequence, Procedure, Index, Database等。而Schema代表了一个数据库概念上的SchemaObject对象的集合。
SessionManager,管理指定database的Session,默认先创建一个SysUser。其中提供一个方法newSession来创建一个会话。并将Session对象绑定到该SessionManager中,一般在成功的创建一个Connection以后在内部使用。
collation,对于CHAR和VARCHAR的字节序定义
dbInfo,系统表的管理类,里面放置着所有系统表名字以及生成这些系统表的工厂方法。
txManager,事务管理器,用于支持事务性语句。
后创建lob相关的表以及schema,并创建所有的预定义的系统表。





4.1.1.2 logger.open()

打开指定的database对象的database文件并启动日志进程。如果指定的database对象是一个新的database,则创建其database文件,步骤如下:
1. 根据连接参数的"fileaccess_class_name"和"storage_class_name"来设置fileAccess,默认根据是否是res类型的数据库设置为FileAccessRes或FileUtil的实例。并根据"storage_class_name"定义的类是否RandomAccessInterface的子类,设置isNewStoredFileAccess的值。
2. 判断本库是否是filebased库(file,res),若不是则说明必然是一个新库(mem型每次重新打开都是新库);若是filebased则查看是否能加载dbName.properties文件,是否含有dbName.script或dbName.script.new文件,若不能加载.properties文件或.script和.script.new都不存在,则说明是一个新库。
3. 根据是否是新建的库执行不同的校验及参数设置行为
4. setVariables() 根据database.databaseProperties属性设置database,例如设置默认的表类型,以及txManager的类型等等。
5. 初始化appLog和sqlLog,它们的类型均为SimpleLog,用于记录异常事件。根据不同的日志级别记录不同程度的事件和错误。默认为LOG_NONE,不记录任何异常事件。
6. 如果不是filebased库,到这里就可以返回了,logger打开完毕。
7. 如果"hsqldb.lock_file"参数不为false,则aquireLock(database.getPath()) 创建并获得.lck锁文件
8. 初始化log并执行log.open():
8.a. 首先initParams(),在其中设置scriptFileName=dbPath+".script", logFileName=dbPath+".log"
8.b. 然后从dbPath.properties中读取modified值:(FILES_NEW, FILES_MODIFIED, FILES_MODIFIED_NEW, FILES_NOT_MODIFIED)
• 如果是FILES_NEW,则什么也也不用做;
• 如果是FILES_MODIFIED_NEW,则将.script.new重命名为.script,并将.data.new(若存在)命名为.data,然后执行FILES_NOT_MODIFIED的逻辑;
• 如果是FILES_NOT_MODIFIED,删除.log文件,然后调用processScript()执行.script中的所有SQL语句重构数据库,processScript()的执行逻辑下面详述。
• 如果是FILES_MODIFIED,说明数据库距上次checkPoint以后修改过,因此应该存在.log文件尚未写入到.script中。故在执行完读取数据库后,还需要从.log中恢复数据。下面针对FILES_MODIFIED需要从.log中恢复的情况做介绍:
1> 首先删除所有.new文件,.old文件, .temp目录下的所有文件
2> 然后执行processScript()
根据是否加密,初始化一个ScriptReaderBase的子类scr,读取文件为scriptFileName
创建一个用于执行脚本的sys权限的数据库会话(session)
执行scr.readAll(session):
readDDL(session) 读取用于定义本库结构的DDL语句并执行,从而构建出本库的结构。
readExistingData(session)读取用于操作本库数据的DML语句并执行,从而将本库的数据构建出来。
3> 然后执行processLog(),其动作与processScript()一致,只是读取文件改为logFileName
4> 后,执行一次checkPoint()完成数据一致性持久化。其中将库的完整信息写入.script.new文件中并用其替换掉现存的.script,删除现存的.log文件,然后调用openLog()重新开启dbLogWriter创建一个新的.log文件。

openLog()函数代码如下:



dbLogWriter = new ScriptWriterText(database, logFileName, false, false, false);
dbLogWriter.setWriteDelay(writeDelay);
dbLogWriter.start();
使用延迟写的方式启动了ScriptWriterText,用于在执行了任何更改语句时记脚本日志到.log文件中。



4.1.1.3 如果是新建数据库

委托userManager创建个用户,默认是SA
委托schemaManager创建个schema,默认名字为PUBLIC
logger.checkpoint() 功能跟上小节中的checkpoint()一样。



4.1.1.4 lobManager.open()

打开大字段处理器:其中以随机读写的方式打开dbPath.lobs文件(若存在的话);然后若SYSTEM_LOBS.BLOCKS表中没有设置BLOCK信息,则插入相应数据。



4.1.1.5 后初始化CheckPointRunner和TimeoutRunner

CheckPointRunner是一个Runnable,其run方法中创建一个sys用户的session,并在其中执行一个"CHECKPOINT"命令的StatementCommand。该命令终会调用session.database.logger.checkpoint(session, defrag, true),其功能在上面介绍过了。该runnable会在每一次session.commit()时被触发调度执行。
TimeoutRunner是一个Runnable,与TimeoutManager合作管理session超时。



4.2 客户端与服务端建立连接流程
1. Client端通过DriverManager.getConnection(url, user, password)来获得一个Connection。按照JDBC的约定,HyperSQL自己实现的JDBCDriver在加载时将自己注册到DriverManager中,然后当调用DriverManager.getConnection时会调用到JDBCDriver的connect方法:





public static Connection getConnection(String url,
Properties info) throws SQLException {


final HsqlProperties props = DatabaseURL.parseURL(url, true, false);


if (props == null) {


// supposed to be an HSQLDB driver url but has errors
throw JDBCUtil.invalidArgument();
} else if (props.isEmpty()) {


// is not an HSQLDB driver url
return null;
}


long timeout = 0;


if (info != null) {
timeout = HsqlProperties.getIntegerProperty(info, "loginTimeout", 0);
}


props.addProperties(info);


if (timeout == 0) {
timeout = DriverManager.getLoginTimeout();
}


// @todo: maybe impose some sort of sane restriction
// on network connections regardless of user
// specification?
if (timeout == 0) {


// no timeout restriction
return new JDBCConnection(props);
}


String connType = props.getProperty("connection_type");


if (DatabaseURL.isInProcessDatabaseType(connType)) {
return new JDBCConnection(props);
}


// @todo: Better: ThreadPool? HsqlTimer with callback?
final JDBCConnection[] conn = new JDBCConnection[1];
final SQLException[] ex = new SQLException[1];
Thread t = new Thread() {


public void run() {


try {
conn[0] = new JDBCConnection(props);
} catch (SQLException se) {
ex[0] = se;
}
}
};


t.start();


try {
t.join(1000 * timeout);
} catch (InterruptedException ie) {
}


try {


// PRE:
// deprecated, but should be ok, since neither
// the HSQLClientConnection or the HTTPClientConnection
// constructor will ever hold monitors on objects in
// an inconsistent state, such that damaged objects
// become visible to other threads with the
// potential of arbitrary behavior.
t.stop();
} catch (Exception e) {
} finally {
try {
t.setContextClassLoader(null);
} catch (Throwable th) {
}
}


if (ex[0] != null) {
throw ex[0];
}


if (conn[0] != null) {
return conn[0];
}


throw JDBCUtil.sqlException(ErrorCode.X_08501);
}
其中首先委托DatabaseURL来解析连接url的参数;
然后获得参数中的timeout值"loginTimeout":首先尝试获取本次连接参数;如果没有就尝试获得DriverManager中定义的参数
接着根据timeout是否为0,执行new JDBCConnection(props)或者开启一个Thread执行new JDBCConnection(props)并使用join设置过期时间。其中获取参数值并根据connType来确定实例化哪种SessionInterface:



若是一个In-Process Type,就直接使用DatabaseManager.newSession()来创建一个Session来使用。
若是hsql://类型,就创建一个new ClientConnection()代理
若是http://类型,就创建一个new ClientConnectionHttp()代理


在后两者中,需要与服务端进行通信,并在创建Connection的时候执行一次CONNECT命令:



首先使用



Result.newConnectionAttemptRequest(user, password,
database, zoneString, timeZoneSeconds);
创建一个ResultConstants.CONNECT类型的Result r,其中包括了用户名密码和要连接的数据库名称。
然后执行execute(r):
其中首先为r设置sessionId和databaseId
然后调用write(r)->r.write(this, dataOutput, rowOut)
后,调用read()->Result.newResult(dataInput, rowIn)得到返回结果。

2. 当服务端收到一个socket连接请求时,会开启一个ServerConnection/WebServerConnection新线程处理。以ServerConnection来介绍,其中首先会执行握手(shakehand)验证,然后基于Result语义来接收Client端的请求:



private void receiveResult(int resultMode) throws CleanExit, IOException {


boolean terminate = false;
Result resultIn = Result.newResult(session, resultMode, dataInput,
rowIn);


resultIn.readLobResults(session, dataInput, rowIn);
server.printRequest(mThread, resultIn);


Result resultOut = null;


switch (resultIn.getType()) {


case ResultConstants.CONNECT : {
resultOut = setDatabase(resultIn);


break;
}
case ResultConstants.SQLCANCEL : {
resultOut = cancelStatement(resultIn);
terminate = true;


break;
}
case ResultConstants.DISCONNECT : {
resultOut = Result.updateZeroResult;
terminate = true;


break;
}
case ResultConstants.RESETSESSION : {
session.resetSession();


resultOut = Result.updateZeroResult;


break;
}
case ResultConstants.EXECUTE_INVALID : {
resultOut =
Result.newErrorResult(Error.error(ErrorCode.X_07502));


break;
}
default : {
resultOut = session.execute(resultIn);


break;
}
}


resultOut.write(session, dataOutput, rowOut);
rowOut.reset(mainBuffer);
rowIn.resetRow(mainBuffer.length);


if (terminate) {
throw cleanExit;
}
}
由上面代码可知,根据client端请求类型的不同,Server端会执行不同的操作并返回不同的结果。当请求类型是ResultConstants.CONNECT时,会执行:



private Result setDatabase(Result resultIn) {


try {
String databaseName = resultIn.getDatabaseName();


dbIndex = server.getDBIndex(databaseName);
dbID = server.dbID[dbIndex];
user = resultIn.getMainString();


if (!server.isSilent()) {
server.printWithThread(mThread + ":Trying to connect user '"
+ user + "' to DB (" + databaseName
+ ')');
}


session = DatabaseManager.newSession(dbID, user,
resultIn.getSubString(),
resultIn.getZoneString(),
resultIn.getUpdateCount());


if (!server.isSilent()) {
server.printWithThread(mThread + ":Connected user '" + user
+ "'");
}


return Result.newConnectionAcknowledgeResponse(session);
} catch (HsqlException e) {
session = null;


return Result.newErrorResult(e);
} catch (RuntimeException e) {
session = null;


return Result.newErrorResult(e);
}
}
其中调用DatabaseManager.newSession来建立一个新的Session,内部委托dbID对应的database.connect来处理,终委托到database对应的sessionManager来实际执行创建session的动作:



Session session = sessionManager.newSession(database, user,
databaseReadOnly, true, zoneString, timeZoneSeconds);
其中,创建一个新的session并放置于sessionMap中维护起来。

后通过Result.newConnectionAcknowledgeResponse(session)来生成返回的结果:

public static Result newConnectionAcknowledgeResponse(Session session) {


Result result = newResult(ResultConstants.CONNECTACKNOWLEDGE);


result.sessionID = session.getId();
result.databaseID = session.getDatabase().getDatabaseID();
result.databaseName = session.getDatabase().getNameString();
result.mainString =
session.getDatabase().getProperties()
.getClientPropertiesAsString();
result.generateKeys = session.getRandomId();


return result;
}
此时,一个客户端到服务端的Connection连接就建立了,客户端使用ClientConnection提供的方法,就像是使用Session中的方法一样。



4.3 一次完整的SQL调用过程
当上面的连接建立完成后,客户端就可以发送Result请求来操作数据库了。一条SQL语句从客户端申请执行到后结果返回的过程如下:









1. 客户端使用JDBC API标准方式执行sql语句:

String url = "jdbc:hsqldb:hsql://localhost:8776/test";
Class.forName("mysocket.jdbc.JDBCDriver");
Connection c = DriverManager.getConnection(url, "SA", "");
Statement stmnt = c.createStatement();
stmnt.execute("DROP TABLE T IF EXISTS;");
stmnt.execute(
"CREATE TABLE T (I IDENTITY, A VARCHAR(50), B CHAR(20));");
stmnt.close();


其中,创建Connection的动作在上面已经介绍了。





2. JDBCStatement.execute或executeQuery中封装请求参数resultOut,并通过sessionProxy(也就是上面连接过程得到的ClientConnection或ClientConnectionHttp)执行resultOut获得返回结果resultIn:

resultOut.setPrepareOrExecuteProperties(sql, maxRows, fetchSize,
statementRetType, queryTimeout, rsProperties, generatedKeys,
generatedIndexes, generatedNames);


try {
resultIn = connection.sessionProxy.execute(resultOut);


performPostExecute();
} catch (HsqlException e) {
throw JDBCUtil.sqlException(e);
}


此时就用到了ClientConnection.execute(resultOut),其中通过ClientConnection的dataOutput和dataInput(缓存为rowOut和rowIn)来与Server端通信。





3. Server端在收到请求时,调用函数:

private void receiveResult(int resultMode) throws CleanExit, IOException {


boolean terminate = false;
Result resultIn = Result.newResult(session, resultMode, dataInput,
rowIn);


resultIn.readLobResults(session, dataInput, rowIn);
server.printRequest(mThread, resultIn);


Result resultOut = null;


switch (resultIn.getType()) {


case ResultConstants.CONNECT : {
resultOut = setDatabase(resultIn);


break;
}
case ResultConstants.SQLCANCEL : {
resultOut = cancelStatement(resultIn);
terminate = true;


break;
}
case ResultConstants.DISCONNECT : {
resultOut = Result.updateZeroResult;
terminate = true;


break;
}
case ResultConstants.RESETSESSION : {
session.resetSession();


resultOut = Result.updateZeroResult;


break;
}
case ResultConstants.EXECUTE_INVALID : {
resultOut =
Result.newErrorResult(Error.error(ErrorCode.X_07502));


break;
}
default : {
resultOut = session.execute(resultIn);


break;
}
}
来针对不同类型的请求执行不同类型的操作,对于SQL语句,会执行后的default:



resultOut = session.execute(resultIn);
通过连接时创建的数据库会话session来执行resultIn,在其中又根据不同类型的SQL语句执行不同的操作行为,具体各SQL类型语句的执行过程将在后面详细讲解。

4. 第3步执行完成后,第2步会收到返回结果。若执行的是statement.executeQuery方法,则会通过:



ResultSet getResultSet() throws SQLException {


checkClosed();


ResultSet result = currentResultSet;


if(!connection.isCloseResultSet) {
currentResultSet = null;
}


if (result == null) {


// if statement has been used with executeQuery and the result is update count
// return an empty result for 1.8 compatibility
if (resultOut.getStatementType() == Statement*.RETURN_RESULT) {
return JDBCResultSet.newEmptyResultSet();
}
}


return result;
}
来获取查询返回结果。后面提到的DML,DDL,DQL等除去实际执行逻辑和返回结果的不同,整体的调用过程都是一样的。



4.4 Server端对SQL语句的处理过程
对于上面4.3节的第3步,当请求是SQL语句类型时,会执行到:





resultOut = session.execute(cmd);
其中,根据传递的Result类型的参数cmd的属性mode值决定具体的执行逻辑,以客户端使用JDBC Statement举例,当客户端直接使用statement.execute(sql)来执行时,传递的cmd.mode==ResultConstants.EXECDIRECT,此时执行的逻辑是:



case ResultConstants.EXECDIRECT : {
Result result = executeDirectStatement(cmd);


result = performPostExecute(cmd, result);


return result;
}
在executeDirectStatement(cmd)中首先委托ParserCommand.compileStatement(sql, cmd)来将sql解析为List<Statement>,注意可以定义多条语句,只要启动Server时不将参数sql.restrict_exec指定为true就可以,执行多条语句只会返回后一条的结果;然后对于解析好的每一个Statement cs,执行session.executeCompiledStatement(cs, ValuePool.emptyObjectArray, queryTimeout)。客户端的各种使用方式,如使用Statement或PreparedStatement,执行execute方法或executeBatch方法等终都是调用到session.executeCompiledStatement函数,该函数就是具体执行一条SQL命令的过程,其方法签名为:

public Result executeCompiledStatement(Statement cs, Object[] pvals,
int timeout)
其中,参数cs代表要执行的sql语句解析完后的Statement对象;pvals代表给定的数据,对于statement.execute(sql)这种方式pvals就是空,而对于preparedStatement.execute()这种调用方式,pvals才有意义,代表了preparedStatement之前设置的数据。该方法对于事务型语句的执行会结合着事务管理器进行,整体执行过程描述如下:

• 首先判断事务是否被废除,若是,则执行回滚及清除session中事务相关的数据。因为事务在等待锁资源或执行过程中可能被取消。

• 接着判断事务实行过程中上下文嵌套的内层不允许是Routine中的javaMethod,或autoCommit型的语句,如果是的话直接返回错误。

• 然后判断要执行的statement,如果statement是autoCommint,那么首先执行一次commit(false),由于此时session.isTransaction为false,所以并不执行database.txManager.commitTransaction(session)。主要作用是重置SessionContext与SessionData以及rowActionList。(一般而言,Schema定义和操作语句是自动提交的事务性的,基于SQL标准。HyperSQL2.3对于这种事务会在执行之前和之后自动执行commit。如此一来,schema相关的语句不能被回滚。这种情况有可能在未来的版本里改变)

• 然后判断cs是否是一个transactionStatement:若不是,直接执行并返回;若是,则执行事务操作(结合TransactionManager来完成事务)。TransactionManager是一个接口,定义了HSQLDB事务管理器对外提供的公共方法,以及支持的事务并发管理模型。在HyperSQL 2中支持二阶段锁模型(LOCKS)、多版本并发控制模型(MVCC)以及它俩的混合模型(MVLOCKS)三种。事务管理器接口说明及并发管理模型的实现细节将在另一篇文章中专门介绍,当前仅基于接口层面介绍语句执行的整体流程:

1> 记录当前语句开始执行时的rowActionList下标(rowActionList类似于undo日志概念,记录了每一次操作的实际操作动作、session信息、针对的行记录备份及执行时间戳等信息,供回滚或确认写入commit日志),该下标用于确认或回滚当前语句涉及到的所有rowAction。

2> 调用database.txManager.beginAction(session, statement) 判断是否可以启动事务并设置preTransaction标志:

其中首先通过boolean canProceed=setWaitedSessionTPL(session. cs) 来判断当前要执行语句cs所需读写的表是否被其他session占用锁定。对于cs要读的表,只需要判断是否有其他session为其加了写锁,若有则需等待该session释放该锁;而对于cs要写的表,需要判断是否有其他session为其加了读锁或写锁,若有则需要等待该session释放该锁。终,将当前语句需要等待释放锁的所有session放入tempSet中。

若session.tempSet为空,则说明当前没有任何其他session在读或写cs要操作的TableName,则直接返回true,说明可以进行下一步。否则通过checkDeadlock(session, session.tempSet)检查是否会有死锁,若没有死锁,也返回true(检查死锁的逻辑是对于每一个依赖路径上的session,依赖它的session不能与它依赖的session相同)。若存在死锁,则返回false(返回之前将session.tempSet置空,并且将session.abortTransaction置为true)。



若上一步返回的canProceed为true,则首先将session.isPreTransaction设置为true,然后判断session.tempSet,若其为空则说明无需等待其他session释放资源,则执行lockTablesTPL(session, cs)来实际执行锁表。若session.tempSet不为空则说明需要等待其他session释放锁,那么就将本session设置进它等待的那些session的waitingSessions列表中 以便后续这些session在结束事务或动作并释放资源时通知到本session ,并重新设置本session的latch(本session正在等待释放资源的sessions的个数)。



3> 判断redoAction标志,如果需要重做,说明在此期间txManager被重设过,其他session修改了事务并发控制模式。若redoAction为true就continue,重新执行。

4> 设置执行的超时时间,并阻塞等待其他session释放必须的锁资源,通过latch.await();等待latch(本session等待释放锁资源的session个数)变为0,才能继续执行后面步骤。

5> 如果动作超时,那么abortAction标志为true,则执行废除本语句执行的后续操作。

6> 再次判断是否废除事务,若是,则执行回滚及清除session中事务相关的数据。因为事务在等待锁资源或执行过程中可能被取消。

7> database.txManager.beginActionResume(session),实际开始执行动作,若此时事务尚未开启,顺便开启事务:



其中设置全局事务时间戳,并且设置session.isPreTransaction为false;设置session.isTransaction为true。并将TransactionManager.transactionCount++。
8> SessionContext.setDynamicArguments(pvals); 主要是给PreparedStatement使用,若为Statement.execute(sql),则pvals值为[]。
9> r = cs.execute(session); 此乃真正的SQL语句执行的逻辑,cs就是前面对SQL语句解析后生成的Statement的具体子类,针对DDL,DML,DQL类型的SQL会生成不同类型的Statement,其各自execute函数也有不同的逻辑,但摒除了语义上的差异所有SQL的执行流程基本上都一致。执行过程中会在session.rowActionList中保存每一个增删操作作为undo日志(更新是通过先删除再插入来实现的),供后续commit或rollback。具体语句各自的业务逻辑将在下面章节分析。



10> 根据sqlEventLogLevel记录语句执行动作,在.sql.log中。默认的级别是LOG_NONE,不记录任何sql语句执行动作日志。

11> endAction(r),清理session中执行语句时生成的中间表的持久化仓储,并视执行结果调用database.txManager.rollbackAction(session)还是database.txManager.completeAction(session)。其中rollbackAction会执行rollbackPartial,通过actionIndex和actionStartTimestamp识别出来本语句执行过程中的所有rowActions,根据这些rowActions执行回滚操作。终进一步调用txManager.endActionTPL(session),视session的隔离级别而解锁前面步中setWaitedSessionTPL方法设置的涉及到的表名的读锁(注意,语句执行完毕后只可能释放读锁,不会释放写锁)。

12> 终再次判断一遍是否取消事务,或者是txManager被重设导致的redoAction为true。并执行相应的处理。

13> 根据执行结果判断是rollback整个事务,还是commit整个事务。其中首先调用了database.txManager.rollback(session)或database.txManager.commitTransaction(session),委托事务管理器实际执行整个事务的回滚操作还是确认并记录commit日志操作(通过rowActionList),释放锁资源并计算后续的锁资源依赖关系等。然后再执行session层面上的事务相关中间数据的清理与重置:

private void endTransaction(boolean commit, boolean chain)
其中将rowActionList,sessionContext的stack、savepoints、savepointTimestamps以及sessionData中的事务范围的表及导航器等等全部清空。

14> 终返回Result类型的执行结果: return r;



• 后针对上一步返回的结果(后一条语句的结果)执行result = performPostExecute(cmd, result) 对result进行一些后处理,并将其返回。

其中,TransactionManager是一个接口,定义了HSQLDB事务管理器对外提供的公共方法,以及支持的事务并发管理模型。在HyperSQL 2中支持二阶段锁模型(LOCKS)、多版本并发控制模型(MVCC)以及它俩的混合模型(MVLOCKS)三种。事务管理器接口说明及并发管理模型的实现细节将在另一篇文章中专门介绍。
————————————————
版权声明:本文为CSDN博主「不动明王1984」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/m0_37962779/article/details/79127655

相关文章