TiDB learning (1)

2020-05-27 00:00:00 专区 订阅 付费 交易 评论

TiDB basic introduction
Component
The TiDB cluster consists of three major components: TiDB Server, PD Server, and TiKV Server (Figure 1).



The functions of each component are as follows:

  • TiDB Server, which can be understood as SQL Layer, is responsible for receiving SQL requests, processing SQL parsing, SQL optimization and other related logic, and interacting with the underlying TiKV to obtain or change data;
  • PD Server, which can be regarded as the brain of the cluster, is responsible for the storage of various meta-information of the cluster and data scheduling and load balancing between TiKV nodes, and is also responsible for the allocation of cluster global transaction IDs;
  • TiKV Server, the storage layer of the cluster, the data is finally stored in the form of Key-Value, and is scheduled by each PD in each KV node to ensure load balancing of each node. TiKV Cluster itself can also be used independently as a distributed Key-Value store.

TiDBCore characteristics
Horizontal expansion calculation and storage
TiDB Server handles SQL requests. As your business grows, you can simply add TiDB Server nodes to improve overall computing power. TiKV is responsible for storing data, and as the amount of data grows,
More TiKV Server nodes can be deployed to increase the overall storage capacity of the cluster. The PD will schedule between TiKV nodes and migrate some of the data to the newly added nodes. This process does not require human intervention.
High self-recovery
The TiDB / TiKV / PD components can tolerate partial instance failures without affecting the availability of the entire cluster.

  • If the TiDB Server node fails, it only affects the session on the node. After the application connection fails, the request can be sent to other normal TiDB Server nodes through the front-end load balancing middleware.
  • The PD node fails. If the non-Raft leader node (the PD Cluster guarantees its own data consistency through the Raft protocol), there is no effect. Otherwise, the leader will be re-selected, and the service cannot be provided externally (about 3 seconds).
  • If the TiKV node fails, it will affect all regions on the node. If the region is not a Raft leader (TiKV Cluster also guarantees data consistency between nodes through the Raft protocol), the service will not be affected, otherwise the service will be interrupted. After the leader is re-elected. restore. If the PD confirms that the failed TiKV node is no longer recoverable, it will automatically schedule the node's data to other normal TiKV nodes.

TiDB other features and principles

  • Highly compatible with MySQL syntax and protocols;
  • Distributed transaction
  • Strong consistency of data across data centers;
  • Massive data is high concurrent with real-time writes and real-time queries.

TiDB evolution background
background
About two years ago, I had the experience of doing MySQL partitioning and middleware. At that time, I did sharding in the middleware and expanded the 16-node MySQL to 32 nodes. I had to do the drill one month in advance. Take it in a week. I was wondering, can there be a database that allows us to stop classifying these things? At that time, we just finished Codis and thought that distributed is a suitable solution. In addition, I have been paying attention to the latest developments in the academic circle about distributed databases. I have seen the papers from Spanner and F1 that Google sent in 2013, so I decided to start writing a database and fundamentally solve the problem of MySQL extensibility.

initial version

  • 0.2: TiDB has only one SQL parser, no storage engine
  • 0.5: The SQL layer is basically completely separated from the storage layer, and HBASE is used as a distributed storage engine.
  • Version 1.0: Storage Engine changed to RocksDB-based storage




TiKV
Key-Value
As a system for saving data, the first thing to decide is the storage model of the data, that is, in what form the data is stored. The choice of TiKV is a Key-Value model and provides an ordered traversal method. In simple terms, you can think of TiKV as a huge Map, where Key and Value are both primitive Byte arrays. In this Map, Keys are sorted according to the order of the original binary bits of the Byte array.
Everyone needs to remember two things about TiKV here:
1. This is a huge Map, which is stored as a Key-Value pair;
2. The Key-Value pair in this Map follows KeyBinary orderThat is, we can seek to a certain Key position, and then continuously call the Next method to get a Key-Value larger than this Key in increasing order.
Having said that, some people may ask, what is the relationship between the storage model and the table in SQL? There is one important thing to say here four times:
The storage model here is independent of the Table in SQL!
The storage model here is independent of the Table in SQL!
The storage model here is independent of the Table in SQL!
The storage model here is independent of the Table in SQL!
Now let's forget any concepts in SQL and focus on how to implement TiKV, a high-performance, high-reliability, huge (distributed) Map.
RocksDB
For any persistent storage engine, the data is ultimately stored on disk, and TiKV is no exception. However, TiKV does not choose to write data directly to the disk, but saves the data in RocksDB. The specific data is carried out by RocksDB. The reason for this choice is that the development of a single-machine storage engine is very labor-intensive, especially to be a high-performance stand-alone engine that requires a variety of detailed optimizations, and RocksDB is a very good open source stand-alone storage engine that can satisfy us. The various requirements of the stand-alone engine, and the Facebook team are constantly optimizing, so that we can enjoy a very powerful and continuous single-engine engine with little effort. Here you can simply think that RocksDB is a stand-alone Key-Value Map.
Raft
Next, we face a more difficult thing: how to ensure that the data is not lost and does not go wrong if the single machine fails?
Simply put, we need to find a way to copy the data to multiple machines, so that one machine hangs, we have other copies on the machine;
The Raft protocol is used here. Raft is a consistent algorithm that is equivalent to Paxos but is easier to understand. Interested can refer to:[raft.github.io/raft.pdf]
Raft is a coherent protocol that provides several important features:
Leader election
2. Member changes
3. Log replication
TiKV uses Raft for data replication. Each data change is mapped to a Raft log. Raft's log replication function securely and reliably synchronizes data to most nodes in the group.



Here we summarize, through the stand-alone RocksDB, we can quickly store the data on the disk; with Raft, we can copy the data to multiple machines to prevent the single machine from failing. Data is written to the interface through the Raft layer instead of writing directly to RocksDB. By implementing Raft, we have a distributed KV, and now we don't have to worry about a machine hanging up anymore.
Region
Speaking of this, we can mention oneVery important concept: Region. This concept is the basis for understanding the following series of mechanisms.
As mentioned earlier, we see TiKV as a huge and orderly KV Map, so in order to achieve horizontal expansion of storage, we need to spread the data across multiple machines. The data mentioned here is scattered across multiple machines and Raft's data replication is not a concept. In this section we forget Raft first, assuming that all data has only one copy, which is easier to understand.
For a KV system, there are two typical scenarios for distributing data across multiple machines:

  • One is to do Hash according to Key, and select the corresponding storage node according to Hash value;
  • The other is to divide the Range, and a continuous key is stored on a storage node.
    TiKV chose the second way, dividing the entire Key-Value space into a number of segments, each segment is a series of consecutive Keys, we call each segment a Region, and try to keep each The data saved in the Region does not exceed a certain size (this size can be configured, the current default is 64MB). Each Region can be described by a Start Left to EndKey such a left closed right open interval.


Note that the Region here has nothing to do with the tables in SQL! Please continue to forget SQL, just talk about KV.
After dividing the data into Regions, there are two important things to do:

  • Distribute data across all nodes in the cluster in Regions, and try to ensure that the number of Regions served on each node is similar
  • Raft replication and member management in Regions

These two points are very important, let's talk about it a little bit.
Looking at the first point, the data is divided into a number of Regions according to Key, and the data of each Region is only saved on one node. Our system has a component that is responsible for spreading the Region as evenly as possible across all nodes in the cluster, thus achieving horizontal expansion of storage capacity (after adding new nodes, it will automatically schedule Regions on other nodes). On the other hand, load balancing is also achieved (there is no case where a node has a lot of data and no data on other nodes). At the same time, in order to ensure that the upper client can access the required data, there will also be a component in our system that records the distribution of the Region on the node, that is, through any Key, it can query which Region the Key is in, and this The node on which the Region is currently located.
For the second point, TiKV is a copy of the data in the Region, that is, a region of the data will save multiple copies, we will call each copy a Replica. Repica maintains data consistency through Raft. Multiple Replicas of a Region are stored on different nodes to form a Raft Group. One of Replica will be the leader of this group, and the other Replica will be the Follower. All reading and writing are done by the Leader and then copied by the Leader to the Follower.
After you understand the Region, you should be able to understand the following picture:


We use the Region as the unit to disperse and copy the data, and there is a distributed KeyValue system with certain disaster tolerance. There is no need to worry about the data being saved or the data loss of the disk failure. It's already cool, but it's not perfect enough, we need more features.
MVCC
Many databases implement multi-version control (MVCC), and TiKV is no exception. Imagine a scenario where two clients modify the value of a Key at the same time. If there is no MVCC, the data needs to be locked. In a distributed scenario, performance and deadlock problems may occur.
TiKV's MVCC implementation is implemented by adding a Version after the Key. Simply put, TiKV can be thought of like this before there is MVCC:
Key1 -> Value
Key2 -> Value
……
KeyN -> Value
With MVCC, the Key arrangement of TiKV is like this:
Key1-Version3 -> Value
Key1-Version2 -> Value
Key1-Version1 -> Value
……
Key2-Version4 -> Value
Key2-Version3 -> Value
Key2-Version2 -> Value
Key2-Version1 -> Value
……
KeyN-Version2 -> Value
KeyN-Version1 -> Value
……
Note that for multiple versions of the same Key, we put the larger version number in front and the version number in the back (recall that the Key we introduced in the Key-Value section is an ordered arrangement). When the user gets the Value through a Key + Version, the Key and Version can be constructed as the Key of the MVCC, which is the Key-Version. Then you can directly see Seek (Key-Version) to locate the first position greater than or equal to this Key-Version.
Detailed principles can be referred to:pingcap.com/blog-cn/mvc
Transaction
TiKV's transaction is based on Percolatorresearch.google.com/pubModel, and did a lot of optimization.
Only one point is mentioned here. The transaction of TiKV adopts optimistic lock. During the execution of the transaction, the write conflict will not be detected. Only during the commit process, the conflict detection will be performed. The two parties in the conflict will write successfully earlier. The other party will try to re-execute the entire transaction.
This model performs well when the write conflicts of the business are not serious, such as randomly updating the data of a row in the table, and the table is large. However, if the business write conflict is serious, the performance will be very poor. To give an extreme example, the counter, multiple clients modify a small number of lines at the same time, resulting in serious conflicts, resulting in a large number of invalid retries.
TiDB
Sql principle
Here we simply understand the relational model as Table and SQL statements, then the question becomes how to save the Table on the KV structure and how to run the SQL statement on the KV structure.
Suppose we have a definition of such a table:
CREATE TABLE User {
ID int,
Name varchar(20),
Role varchar(20),
Age int,
PRIMARY KEY (ID),
Key idxAge (age)
};
There is a huge difference between SQL and KV structures, so how to map easily and efficiently becomes an important issue. A good mapping scheme must facilitate the need for data manipulation.
For a Table, the data that needs to be stored consists of three parts:
1. Meta information of the table
2. Row in Table
3. Index data
DML operation
For Row, you can choose row or column storage, both of which have advantages and disadvantages. TiDB's primary goal is OLTP services, which need to support fast reading, saving, modifying, and deleting a row of data, so it is appropriate to use line storage.
For Index, TiDB does not only need to support the Primary Index, but also supports the Secondary Index. Auxiliary queries for the role of Index, improve query performance, and guarantee certain Constraint.
There are two modes when querying:

  • Check, for example, through the equivalent condition of Primary Key or Unique Key, such as select name from user where id=1;, this needs to quickly locate a row of data through the index;
  • Range query, such as select name from user where age > 30 and age < 35;, this time you need to query the data between age 20 and 30 through the idxAge index. Index is also divided into Unique Index and non-Unique Index, both of which need to be supported.

After analyzing the characteristics of the data that needs to be stored, let us look at the operational requirements for these data, mainly considering the four statements Insert/Update/Delete/Select.
For the Insert statement, you need to write the Row to the KV and build the index data.
For the Update statement, you need to update the index data (if necessary) while updating the Row.
For the Delete statement, you need to delete the index while deleting the Row.
The above three statements are very simple to handle. For the Select statement, the situation is more complicated. First we need to be able to read a row of data simply and quickly, so each Row needs to have an ID (display or implicit ID). Secondly, it may read consecutive rows of data, such as Select * from user;. Finally, there is a need to read data through an index. The use of the index may be an enumeration or a range query.
TiDB assigns a TableID to each table, each index is assigned an IndexID, and each row is assigned a RowID (if the table has an integer Primary Key, the value of the Primary Key is used as the RowID), where the TableID is unique within the entire cluster. , IndexID / RowID is unique within the table, these IDs are all int64 type.

  • Each row of data is encoded into a Key-Value pair according to the following rules:

Key: tablePrefix_rowPrefix_tableID_rowID
Value: [col1, col2, col3, col4]
The tablePrefix/rowPrefix of Key is a specific string constant used to distinguish other data in the KV space.

  • For Index data, it will be encoded into a Key-Value pair according to the following rules:

Key: tablePrefix_idxPrefix_tableID_indexID_indexColumnsValue
Value: rowID
The Index data also needs to consider both the Unique Index and the Non-Unique Index. For the Unique Index, you can follow the above encoding rules. But for non-Unique Index, this code does not construct a unique Key, because the same Index's tablePrefix_idxPrefix_tableID_indexID_ is the same, there may be multiple rows of data ColumnsValue is the same, so the encoding of the non-Unique Index has been adjusted:
Key: tablePrefix_idxPrefix_tableID_indexID_ColumnsValue_rowID
Value:null
This allows a unique Key to be constructed for each row of data in the index.
Note that the various xxPrefix in Key in the above encoding rules are string constants. The role is to distinguish namespaces, so as to avoid conflicts between different * of data, as follows:
var(
tablePrefix = []byte{'t'}
recordPrefixSep = []byte("_r")
indexPrefixSep = []byte("_i")
)
In addition, please note that in the above scheme, whether it is Row or Index Key encoding scheme, all Rows inside a Table have the same prefix, and an Index data has the same prefix. The data of such specific prefixes are arranged in the Key space of TiKV. At the same time, as long as we carefully design the encoding scheme of the suffix part to ensure that the comparison relationship between pre-encoding and post-encoding is unchanged, then Row or Index data can be stored in TiKV in an orderly manner. This scheme, which guarantees the pre-encoding and post-encoding comparisons, is called Memcomparable. For any type of value, the original type is compared before the two objects are encoded, and encoded into a byte array (note that the Key in TiKV) The result of the comparison with the Value is the original byte array). See the TiDB codec package for the specific encoding scheme. (github.com/pingcap/tidb). With this encoding, all Row data of a table will be arranged in the KeySpace of TiKV in the order of RowID. The data of an Index will also be arranged in the Key space according to the ColumnValue of the Index.
Now let's take a look at the requirements mentioned at the beginning and the mapping scheme of TiDB to see if this solution can meet the demand. First, we use this mapping scheme to convert both Row and Index data into Key-Value data, and each row and each index data has a unique Key. Secondly, this mapping scheme is very friendly for enumeration and range query. We can easily construct a key corresponding to a certain row or an index, or a neighboring row of a certain block and an adjacent index value. The scope of the Key. Finally, when guaranteeing some Constraint in the table, you can determine whether the corresponding Constraint can be satisfied by constructing and checking whether a Key exists.
So far we have talked about how to map Table to KV. Here is a simple example for everyone to understand, or take the above table structure as an example. Suppose there are 3 rows of data in the table:
1, "TiDB", "SQL Layer", 10
2, "TiKV", "KV Engine", 20
3, "PD", "Manager", 30
Then first each row of data will be mapped to a Key-Value pair. Note that this table has a Primary Key of type Int, so the value of RowID is the value of this Primary Key. Suppose the table has a Table ID of 10 and its Row data is:
t_r_10_1 --> ["TiDB", "SQL Layer", 10]
t_r_10_2 --> ["TiKV", "KV Engine", 20]
t_r_10_3 --> ["PD", "Manager", 30]
In addition to the Primary Key, this table has an Index. Assuming the ID of this Index is 1, its data is:
t_i_10_1_10_1 —> null
t_i_10_1_20_2 --> null
t_i_10_1_30_3 --> null
Meta information management
The previous section describes how the data and indexes in the table are mapped to KV. This section describes the storage of meta information. Database/Table has meta-information, which is its definition and its properties. This information also needs to be persisted. We also store this information in TiKV. Each Database/Table is assigned a unique ID, which is uniquely identified, and when encoded as Key-Value, this ID is encoded into the Key, plus the m_ prefix. This can construct a Key, and the corresponding Value stores the serialized meta information.
In addition to this, there is a special Key-Value that stores the version of the current Schema information. TiDB uses Google F1's Online Schema change algorithm to have a background thread constantly checking for changes to the version of Schema stored on TiKV and ensuring that version changes are made (if it does change) for a certain amount of time. See the article: TiDB's asynchronous schema change implementation for specific implementations of this section.
github.com/ngaut/buildd
SQL on KV architecture


SQL operation
After understanding the SQL to KV mapping scheme, we can understand how relational data is stored. Next we need to understand how to use this data to meet the user's query needs, that is, how a query operates on the underlying stored data.
The simplest solution that can be thought of is to map the SQL query to a query for KV through the mapping scheme described in the previous section, then obtain the corresponding data through the KV interface, and finally perform various calculations.
For example, Select count(*) from user where name="TiDB"; Such a statement, we need to read all the data in the table, and then check if the Name field is TiDB, and if so, return this line. Such an operational process is converted to a KV operational process:

  • Construct Key Range: All RowIDs in a table are in the range [0, MaxInt64), then we can construct a [StartKey, EndKey] left and right open according to Row's Key encoding rules with 0 and MaxInt64. Interval
  • Scan Key Range: Read the data in TiKV according to the Key Range constructed above
  • Filter the data: Calculate the expression name="TiDB" for each row of data read. If true, return the row up, otherwise discard the row.
  • Count: Counts up to the Count value for each row that meets the requirements

The whole process is as follows:


This solution is definitely workable, but it is not very good for work. The reason is obvious:
1. When scanning data, each line must be read out with TiKV through KV operation, at least once RPC overhead. If there is a lot of data to be scanned, this overhead will be very large;
2. Not all lines are useful. If you don't meet the conditions, you can't read them.
3. The value of the line that meets the requirements does not make sense. In fact, only a few lines of data need to be used here.
Distributed SQL operation
How to avoid the above defects is also obvious, first we need to put the calculation as close as possible to the storage node to avoid a large number of RPC calls. Second, we need to push the Filter to the storage node for calculation, so we only need to return valid rows to avoid meaningless network transmission. Finally, we can push the aggregate function and GroupBy to the storage node for pre-aggregation. Each node only needs to return a Count value, and then the tidb-server will Sum the Count value.
Here is a schematic diagram of data returned layer by layer:


Sql optimization
Speed ​​up method of traditional database
Traditional databases have many speed-up methods. There are two well-known ones, one is MPP, and its architecture is shown in the figure below. The calculation data is divided into different nodes, and it is probably not on a single machine. They connect through high-speed network, let each node process the data by itself, process the data and then put it together, and finally return the result to the user. .


The biggest feature of this architecture is that it is a share nothing architecture, which means that the calculations between nodes are not known to each other, then they only perform their own things, do not need to go Exchange data, this is an architecture.
There is also a type called SMP, which corresponds to MPP, which is a share everything architecture.


This kind of architecture is usually done on a note, on a compute node, and then they have multiple CPUs at the same time, they will go through the bus to share, such as memory, IO Something, this is an architecture of share everything. You can see MPP and SMP. These are some of the solutions used to speed up in two traditional databases. I saw the latest code of PG (PostgresSQL). They already support parallel processing. For example, they can do parallel scan. They can define the degree of concurrency, for example, scan a table. They can use the multi-core feature to speed up a lot. Of course, this is definitely not linear, because you have to do parallel, do data exchange, there is overhead, this overhead is quite big when your parallelism is too high.
▌TiDB
After talking about traditional data, let's talk about TiDB. The architecture of TiDB is shown below.


The virtual box is labeled TiDB SQL layer, the top layer of which is the protocol layer, which is to parse the MySQL protocol. Then there is the SQL layer, which is responsible for SQL parsing, querying, query plan development, and generating executors. It will call the underlying interface to get the data, and then perform the SQL operation. The next layer, you can see, divided into two interfaces, one is KV's API, that is, we will map the data to KV, because our bottom layer is a KV storage engine. For example, for a row of data, we will use Row ID plus Table ID plus Database ID to make a key, then use the data in this row as value and then throw it into KV, and then convert it into a key-value mode. For index, we also converted to KV mode, because our KV has a feature that can be an orderly scan. For example, if you want to build an index on some Columns, we will encode the Column into a key, then add something like index ID, Table ID, etc., and send it to this KV. That is to say, our upper layer, you can think that only through this API can also get the data and access the data correctly, which is probably such an architecture. Then there is a DistSQL API, which provides an abstraction for the upper layer of our distributed computing framework, which I will cover in detail later. The bottom layer is our TiKV. In order to support our distributed SQL API, we have added more features to it. Here we have reference to HBase's coprocessor solution, and then provide some EndPoint functions, which can provide richer semantics for the upper layer.
So how do we make SQL run faster in TiDB/TiKV?
The first is that whether it is a NewSQL database or a traditional database, we must do some optimization on the optimizer. In this regard, we have done a lot of special things, including constant folding, and more will be done later, such as constant propagation.
In addition to the optimizer, after determining a query plan, how to execute it is also very important.

  • Utilize the wide distribution of data to improve the overall degree of parallelism. And our TiDB SQL layer is written in Go. Go can play a concurrency advantage on multi-core machines. Its Goroutine scheduling overhead is very small. We can build a lot of Goroutine, and use this feature of CPU more and more to improve. The degree of parallelism of the calculation. There is also something like the traditional, such as MySQL, PG, it is mainly the cost of accessing memory and accessing the hard disk. But for TiDB, a large part of it is consumed on the network, that is to say, if you send a request to the past, get the data, and go through the network, there is still a lot of overhead. So one of our very important purposes is to make the whole data flow as fast as possible, as smooth as possible, and to eliminate the network overhead as much as possible.



  • Regions: Store data. The Regions in the above image is a region server of TiKV. We can insert some code inside to enable it to perform some of the tasks we have defined for it.
  • TiKV Client: That is, we generate the execution plan after SQL Optimizer, and then we will generate this executor according to the execution plan. It is used to access TiKV and send requests via Rpc. It also has a very important function to get the data distribution. Because a table is divided into many KVs, these KVs are hashed on many TiKV servers. Its important function is to do this, which is equivalent to data routing, which is an important task.
  • DistSQL API: The DistSQL API is a package between the Executor and TiKV Client layers. That is to say, we can connect to other storage engines in addition to TiKV, as long as you meet the definition of our interface.
  • Executor sets the execution logic to say that it tells you what you need to do below. For example, if you need to do count, you still need to calculate Where, which understands SQL logic.

SQL layer architecture
The above briefly introduces some of the features of the SQL layer, I hope you have a basic understanding of the processing of SQL statements. In fact, TiDB's SQL layer is much more complex, with many modules and layers. The following figure lists important modules and call relationships:


The user's SQL request is sent to the tidb-server directly or through Load Balancer. The tidb-server parses the MySQL Protocol Packet, gets the request content, then performs syntax parsing, query plan formulation and optimization, and executes the query plan to get and process the data. The data is all stored in the TiKV cluster, so in the process tidb-server needs to interact with tikv-server to get the data. Finally, tidb-server needs to return the query results to the user.
DDL
DDL in TiDB
TiDB's DDL accomplishes lock-free, online schema changes in distributed scenarios by implementing Google F1's online asynchronous schema change algorithm. To simplify the design, TiDB allows only one node to perform DDL operations at the same time. Users can send multiple DDL requests to any TiDB node, but all DDL requests are internally executed by the worker of the owner node within the TiDB.

  • Worker: Each node has a worker to handle DDL operations.
  • Owner: Only one node in the entire cluster can be elected owner, and each node may be elected as the role. The node worker after the owner is elected has the right to handle DDL operations. The owner node is generated by electing the owner node from multiple TiDB nodes using the electoral function of Etcd. The owner has a term, and the owner will actively maintain its own term, that is, renewal. When the owner node is down, other nodes can perceive and elect a new owner via Etcd.

Here is a brief overview of TiDB's DDL design. The next two articles detail the design and implementation of TiDB DDL.

  • TiDB asynchronous schema change implementation
  • Asynchronous schema change optimization for TiDB

The following figure depicts a simple processing flow for a DDL request in TiDB:



Figure 1: Process flow of DDL SQL in TiDB
TiDB's DDL component related code is stored in the source directory.ddl Under contents.
FileIntroductionddl.goContains DDL interface definitions and their implementation.ddl_api.goAn API that provides operations such as create , drop , alter , truncate , rename , etc., for Executor calls. The main function is to encapsulate the job of the DDL operation and then store it in the DDL job queue, waiting for the job to complete and return.ddl_worker.goThe implementation of the DDL worker. The worker of the owner node takes the job from the job queue and executes it. After the execution is completed, the job is stored in the job history queue.syncer.goResponsible for synchronizing the owner and follower of the ddl workerschema version . Every time the DDL status changesschema version ID Will add 1.ddl owner The relevant code is placed separatelyowner In the directory, functions such as owner election are implemented.
In addition,ddl job queue with history ddl job queue Both queues are persisted into TiKV.structure There is a list under the directory.hash The implementation of data structures on TiKV.
PD
TiKV cluster is a distributed KV storage engine of TiDB database. Data is copied and managed in units of Region. Each Region will have multiple Replicas. These Replicas will be distributed on different TiKV nodes. Leader is responsible for reading/ Write, Follower is responsible for synchronizing the raft log sent by the Leader. With this information in mind, consider these questions:

  • How to ensure that multiple Replicas of the same Region are distributed on different nodes? Further, what if there are multiple TiKV instances launched on one machine?
  • When a TiKV cluster is deployed across a machine room for disaster recovery, how can I ensure that one room is dropped and no multiple Replica of the Raft Group is lost?
  • After adding a node to the TiKV cluster, how can I move the data on other nodes in the cluster?
  • What happens when a node goes offline? What do you need to do for the entire cluster? What if the node is only briefly dropped (restarting the service)? If the node is dropped for a long time (disk failure, data is lost), what needs to be done?
  • Assuming that the cluster needs N copies per Raft Group, the number of Replicas may not be enough for a single Raft Group (for example, the node is dropped, the copy is lost), or it may be too much (for example, the dropped node returns to normal. , automatically join the cluster). So how do you adjust the number of Replica?
  • Read/write is done through the Leader. If the Leader is only concentrated on a small number of nodes, what effect will it have on the cluster?
  • Not all Regions are frequently accessed. It is possible to access hotspots in only a few Regions. What do we need to do at this time?
  • When a cluster is doing load balancing, it often needs to relocate data. Does this migration of data consume a lot of network bandwidth, disk IO, and CPU? Which affects online services?

These problems alone may find a simple solution, but mixed together, it is not very easy to solve. Some problems seem to only need to consider the internal situation of a single Raft Group, such as whether the number of copies is sufficient to decide whether to add a copy. But in fact, where this copy is added, it is necessary to consider the global information. The whole system is also dynamically changing. Region splitting, node joining, node failure, access hotspot changes, etc. will continue to occur. The entire scheduling system also needs to continuously advance to the optimal state in the dynamic. If there is no one to grasp the global information, it can be global. It is difficult to meet these requirements by scheduling and configurable components. Therefore, we need a central node to control and adjust the overall condition of the system, so we have the PD module.
Scheduling demand
There are a lot of questions listed above, we will sort and sort them first. Overall, there are two main * of problems:
As a distributed high availability storage system, the requirements must be met, including four

  • The number of copies cannot be more or less
  • The copy needs to be distributed on different machines
  • After adding a new node, you can migrate copies from other nodes.
  • After the node goes offline, you need to migrate the data of the node.

As a good distributed system, where optimization is needed, including:

  • Maintain a uniform distribution of Leader across the cluster
  • Maintain a uniform storage capacity per node
  • Maintain a uniform distribution of access hotspots
  • Control the speed of Balance to avoid affecting online services
  • Manage node status, including manual online/downline nodes, and automatic offline failed nodes

After meeting the first type of requirements, the entire system will have multiple copies of fault tolerance, dynamic capacity expansion / volume reduction, tolerate node dropout and automatic error recovery. After satisfying the second type of requirements, the load of the overall system can be made more uniform and can be conveniently managed.
In order to meet these needs, we first need to collect enough information, such as the status of each node, the information of each Raft Group, the statistics of business access operations, etc. Secondly, we need to set some policies, and the PD based on this information and the scheduling strategy. A scheduling plan that satisfies the requirements described above; finally, some basic operations are required to complete the scheduling.
Basic operation of scheduling
Let's first introduce the simplest point, which is the basic operation of scheduling, that is, what functions we have to use in order to meet the scheduling strategy. This is the basis of the entire schedule, understand what kind of hammer is in your hand, and know what kind of posture to use to nail the nail.
The above scheduling requirements seem complicated, but the final result is nothing more than the following three things:

  • Add a Replica
  • Delete a Replica
  • Transfer the Leader role between different Replicas in a Raft Group

Just the Raft protocol can meet these three requirements. The three basic operations can be supported by the three commands AddReplica, RemoveReplica, and TransferLeader.
collect message
Scheduling relies on the collection of information about the entire cluster. Simply put, we need to know the state of each TiKV node and the state of each Region. The TiKV cluster reports two * of messages to the PD:
Each TiKV node periodically reports the overall information of the node to the PD.
A heartbeat packet exists between the TiKV node and the PD. On one hand, the PD detects whether each store is alive through a heartbeat packet, and whether there is a newly added Store. On the other hand, the heartbeat packet also carries the status information of the store. mainly includes:

  • Total disk capacity
  • Available disk capacity
  • The number of Regions carried
  • Data write speed
  • Number of Snapshots sent/accepted (replica may sync data via Snapshot)
  • Whether overload
  • Tag information (tags are a series of tags with hierarchical relationships)

Each Raft Group's Leader regularly reports information to the PD
There is a heartbeat packet between each Raft Group's Leader and PD, which is used to report the status of this Region, including the following information:

  • Leader's location
  • The location of Followers
  • Dropped number of Replica
  • Data write/read speed

The PD continuously collects the information of the entire cluster through these two * of heartbeat messages, and then uses this information as the basis for decision making. In addition, the PD can accept additional information through the management interface for more accurate decisions. For example, when a heartbeat packet of a store is interrupted, the PD cannot determine whether the node is temporarily invalid or permanently invalid. It can only wait for a while (the default is 30 minutes). If there is no heartbeat packet, it is considered to be the store. Go offline and decide that you need to schedule the Regions on this Store. But sometimes, the operation and maintenance personnel take the initiative to put a machine offline. At this time, the PD management interface can be used to notify the PD that the Store is not available. The PD can immediately determine that the Region above the Store needs to be dispatched.
Scheduling strategy
After the PD has collected this information, it also needs some strategies to develop a specific scheduling plan.
The correct number of Replica for a Region
When the PD discovers that the number of Replica for this Region does not meet the requirements through the heartbeat packet of a Region Leader, the Replica number needs to be adjusted through the Add/Remove Replica operation. The possible reasons for this are:

  • A node is dropped, all the above data is lost, resulting in insufficient Replica in some Regions.
  • A dropped node resumes service and automatically accesses the cluster. This has already replenished Replica's Region with more Replica, and needs to delete a Replica.
  • The administrator adjusted the copy policy and modified the configuration of max-replicas

Multiple Replicas in a Raft Group are not in the same location
Note the second point, "Multiple Replicas in a Raft Group are not in the same location", here is the "same location" instead of the "same node." In general, the PD will only guarantee that multiple Replicas will not fall on one node to avoid multiple Replica loss due to single node failure. In actual deployment, the following requirements may also occur:

  • Multiple nodes are deployed on the same physical machine
  • TiKV nodes are distributed across multiple racks, and system availability is guaranteed when a single rack is powered down
  • TiKV nodes are distributed among multiple IDCs. When power is lost to a single computer room, the system is also available.

These requirements are essentially the same location attribute of a node, which constitutes a minimum fault-tolerant unit. We hope that there will not be multiple Replica of a Region within this unit. At this time, you can configure the lables for the node and configure the location-labels on the PD to identify which lables are location identifiers. You need to ensure that there are no multiple Replica nodes in the Region that have the same location identifier when Replica is allocated. .
Even distribution of copies between stores
As mentioned earlier, the upper limit of the data capacity stored in each copy is fixed, so we maintain a balance of the number of copies on each node, which will make the overall load more balanced.
Leader number is evenly distributed across the Store
The Raft protocol reads the core writes through the Leader, so the calculated load is mainly on the Leader, and the PD will spread the Leader between the nodes as much as possible.
The number of access hotspots is evenly distributed across the Store
Each Store and Region Leader carries information about the current access load when reporting information, such as the read/write speed of the Key. The PD detects the access hotspot and spreads it out among the nodes.
The storage space of each store is roughly equal
Each Store starts with a Capacity parameter, indicating the storage limit of the Store. When the PD is scheduling, it considers the remaining amount of storage space of the node.
Control scheduling speed to avoid affecting online services
Scheduling operations consume CPU, memory, disk IO, and network bandwidth, and we need to avoid having too much impact on online services. The PD controls the number of operations currently in progress. The default speed control is conservative. If you want to speed up the scheduling (for example, if you have stopped the service upgrade, add a new node, and want to schedule it as soon as possible), you can manually speed up the scheduling with pd-ctl. speed.
Support manual offline nodes
After manually disconnecting the node through pd-ctl, the PD will schedule the data on the node under a certain rate control. When the scheduling is completed, the node will be placed offline.
Scheduling implementation
After understanding the above information, let's take a look at the entire scheduling process.
The PD continuously collects information through the heartbeat packet of the Store or Leader, obtains the detailed data of the entire cluster, and generates a sequence of scheduling operations based on the information and the scheduling policy. Each time the heartbeat packet sent by the Region Leader is received, the PD checks whether there is any For the operation to be performed by this Region, the operation to be performed is returned to the Region Leader through the reply message of the heartbeat packet, and the execution result is monitored in the subsequent heartbeat packet. Note that the operation here is only a recommendation for Region Leader. It is not guaranteed to be executed. It will be executed and when it will be executed. It is determined by Region Leader itself according to its current state.
TiDB monitoring
TiDB monitoring
TiDB monitoring framework
Prometheus is responsible for collecting and storing time series data. PushGateway to receive the data from the Client Push, the same for Prometheus to pull. And AlertManager to implement the alarm mechanism.


Important monitoring indicators
TIDB Server

  • Query processing time, you can see the delay and throughput
  • Ddl process monitoring
  • TiKV client related monitoring
  • PD client related monitoring

PD Server

  • The total number of times the command was executed
  • The total number of times a command failed to execute
  • Time-consuming statistics for successful execution of a command
  • Time-consuming statistics for a command execution failure
  • Time-consuming statistics for a command to complete and return results

TiKV Server

  • GC monitoring
  • The total number of times a KV command was executed
  • Time-consuming statistics of the Scheduler execution command
  • Total number of Raft propose commands
  • Time-consuming statistics of Raft execution commands
  • The total number of times Raft failed to execute a command
  • The total number of times Raft handles the ready state

Specific can be queried:pingcap.com/docs-cn/op-
Backup and recovery
Data migration (migrating MySQL data to TiDB)
TiDB cluster deployment

相关文章