What you need to know about ClickHouse architecture in order to use it effectively. Alexey Zatelepin (2018)
ClickHouse - a high-performance open-source analytical database developed in Yandex. Initially, ClickHouse was created for the tasks of Yandex.Metrics , but gradually found many applications both within Yandex and in other companies. I’ll tell you how ClickHouse is built internally with an emphasis on what the consequences of the selected architecture are from the point of view of the application developer.
The following topics will be covered:
- How ClickHouse stores data on disk and executes a query, why this storage method allows you to speed up analytical queries by several orders of magnitude, but is poorly suited for OLTP and key-value loading.
- How replication works and sharding, how achieve linear scaling and what to do with eventual consistency.
- How to diagnose problems on the ClickHouse production cluster.
The idea of the report is quite simple: that if you use or are going to use some tool, it is advisable to at least give a general idea of what is going on inside to avoid any wrong decisions and make the right decision. Here is an understanding about ClickHouse I will try to convey today.
I have been working on ClickHouse recently. Before that, I worked for Yandex.Maps for several years. He was an application developer. I worked a lot with databases, with Postgres, so I'm still not very infected with the ClickHouse virus, I still remember what it means to be an application developer. But, in principle, I already understand everything quite well.
Let's start right away with the tasks. ClickHouse is not a special-purpose base, you don’t need to shove it everywhere, but the range of applications is quite wide. It is expressed by such a formula.
We have some events. They are constantly coming. I wrote out some examples here. Here Yandex.Metrica is a service for which ClickHouse was originally created.
- These are some user actions on the site.
- Financial transactions, shopping in stores.
- And DNS queries.
What do we want to do with these events? We want to preserve information about them and understand something about them later, that is, to build some reports, analytics, then look at them and understand something.
But any system that solves these problems has an ideology. That is, what was important for the creators of this system, what they were ready to give up, what they were ready to sacrifice.
For ClickHouse, the most important thing:
- These are interactive queries. What it is? This execution in seconds, and better in less than a second. Why is it important? Firstly, when Yandex.Metrica displays a report, the user will not wait if it loads for more than a second. But even if you, as an analyst, work with ClickHouse, with the database, then it is very good if the answers to your requests come immediately. You can ask a lot of them then. You can not be distracted. You can immerse yourself in working with data. This is a different quality of work.
- The query language is SQL. These are also the pros and cons. The pluses are that SQL is declarative and therefore a simple query can be very well optimized, i.e. very optimally executed. But SQL is not very flexible. T. e.You cannot specify arbitrary data transformation using SQL, so we have a bunch of extensions there, a lot of additional functions. And the advantage is that all analysts know SQL, it is a very popular language.
- We try not to aggregate anything in advance. That is, when you do such a system, the temptation is first to think about which reports are needed. To think that my events will come, I will slowly aggregate them and I will need to show a report, I’ll quickly show all this. But there is a problem with this approach. If you merged two events together, then you will no longer distinguish them in any report. You have them together. Therefore, to maintain flexibility, we always try to store individual events and not aggregate anything in advance.
- Another important point that is required from the application developer who works with ClickHouse. It is necessary to understand in advance what are the attributes of the event. You need to select them yourself, isolate them and push these attributes into ClickHouse, that is, some free-form json or text blob that you just pick and shove, and hope to parse later. It’s better not to do this, otherwise you won’t have interactive queries.
Let's take an example quite simple. Imagine that we are making a clone of Yandex.Metrica, a web analytics system. We have a counter that we put on the site. It is identified by the CounterID column. We have a hits table in which we add pageviews. And there’s a Referer column, and something else. That is, a bunch of everything, there are 100 attributes.
We make a very simple request. We take and group by Referer, count, sort by count. And the first 10 results are shown.
The request needs to be completed quickly. How to do this?
First of all, you need to read very quickly:
- The simplest thing to do here is the column organization. That is, we store data in columns. This will allow us to load only the necessary columns. In this request, this is: ConterID, Date, Referrer. As I said, there can be 100 of them. Naturally, if we all load them, this will slow down us very much.
- Since the data in our memory probably does not fit, we need to read it locally. Of course, we do not want to read the entire table, so we need an index. But even if we read this little part that we need, we need a local read. We cannot jump on the disk and search for the data that we need to fulfill the request.
- And you definitely need to compress the data. They are compressed several times and the bandwidth of the drive saves a lot.
And after we read the data, we need to process it very quickly. ClickHouse has a lot to do for this:
- The most important thing is that he processes them in blocks. What is a block? A block is a small part of a table somewhere in the size of several thousand rows. Why is it important? Because ClickHouse is an interpreter. Everyone knows that interpreters are very slow. But if we spread the overhead into several thousand lines, then it will be invisible. But this will allow us to apply SIMD instructions. And for the processor cache, this is very good, because if we raised the block to the cache, we process it there, then it will be much faster than if it falls into memory somewhere.
- And a lot of low-level optimizations. I won’t talk about it.
In the same way as in ordinary classical databases, we select and see what conditions will be in most queries. In our web analytics system, most likely it will be a counter. The owner of the counter will come and watch reports. And also a date, that is, he will look at reports for a certain period of time. Or maybe he’ll want to see it for the whole time of its existence, therefore such an index is CounterID, Date.
How can we verify that it fits? We sort the table by CounterID, Date and look at our rows that we need, they occupy just such a small area. This means that the index will do. It will greatly accelerate.
But ClickHouse has an index and features, in contrast to familiar indexes. First, the table will be sorted by key physically. Other systems also call the cluster index. This means that there can only be one such index per table. The second, despite the fact that we call it the primary key, it does not provide uniqueness. This must be remembered.
The first thing to understand is that it is sparse, that is, it does not index every row in the table, but it indexes every ten thousandth row. Physically, it represents the value of the primary key expression, which is written for each 8 192 lines. By default, this value we offer. It basically works well.
And when we have this index, when executing the request, what do we do? We need to select lines that may be useful for us to fulfill the request. In this case, we need a counter 1234 and a date from May 31. And here there is only a record for May 23. This means that, starting from this date, we should all read. And before the record, the counter of which begins already in 1235. It turns out that we will read a little more records than necessary. And for analytical tasks, when you need to read a lot of lines - this is not scary. But if you need any one line, then everything will not work so well. To find one line, you have to read 8,000.
The columns in which the data lies are ordered by the same expression of the primary cry. And now from them you need to compose this block, which will then be processed in ClickHouse. To do this, there are such files that we call "serif files" that contain pointers to the value corresponding to the lines of the primary key.
How is reading? We understand between what values of the primary key there are lines of interest to us by serif files. In each column we understand what to read. Read. And then we collect the block. And then he went down the query pipeline.
What is important here? Key-Value script will not work well. I have here in light gray what we read, and in dark gray what we need. And it may well be that you will need one line, and you will read a lot.
And if your values in the column are large enough, for example, 100 bytes, or maybe a couple of kilobytes, then one serif that you read can take quite a lot. Therefore, if your values in the columns are large, then perhaps it makes sense to slightly reduce this value.
I said that the table is streamlined, but did not tell how to do it. And we really want the data that comes to ClickHouse to be immediately available for reporting. That is, after insert a second has passed, and you already see them in your reports.
And there is a problem, because the data arrives approximately in time, that is, events in the last few minutes. And we need the primary key, i.e., the counter. And events just arrive at the counters mixed up. And you need to sort them somehow.
How to do this? ClickHouse comes with this solution. Table engine MergeTree . The idea is about the same as the LSM tree. That is, we have a small number of ordered pieces. If there are a lot of them, then we take a few pieces and make one of them. Thus, we maintain a small amount each time.
That's about how this happens. For us, with X, this is the number of the insert, that is, roughly speaking, this is the time that you insert. By Y is the primary key. Received data. They are marked in green. They are not ordered.
What does ClickHouse do? First of all, he sorts them and then writes them to disk. A new piece appears. What is important here? The data from your insert will immediately go to disk, that is, there will be no buffering and therefore it will be very difficult to write individual records.
There are tools, for example, buffer tables in ClickHouse, which can reduce this problem, but you still need to make inserts in large pieces. We recommend doing this once a second and at least 1,000 records, otherwise there will be a lot of disk drive and everything will work very poorly.
We have a new piece. Something else came later. It is necessary to reduce the number of pieces. This process takes place in the background. It is called merge or merge.
ClickHouse has one feature. Merging occurs only with pieces that have been inserted in a row. In our case, there was a piece that combines inserts from M to N. And a small piece that we inserted on the previous slide, which was inserted at number N + 1.
We take them and merge. And we get a new piece of M to N + 1. He is orderly.
What is important here? This process takes place in the background. And it is necessary to follow it, because if something goes wrong there, for example, it slows down or you start inserting too often and it won’t cope, then sooner or later everything will break.
What will it look like with ClickHouse? When you have 200 pieces for a partition (a partition is a month), then your inserts will suddenly brake. In this way, ClickHouse will try to allow merges to catch up with the inserts. And if there are already 300 pieces, then it will simply forbid you to insert, because otherwise it will be very difficult to read data from many pieces. Therefore, if you use ClickHouse, then be sure to monitor it. Configure ClickHouse to export metrics to Graphite. It is very easy to do. And keep track of the number of pieces in the partition. If the quantity is large, then you need to deal with it. Maybe something with the disk, or you started to insert very much. And it needs to be repaired.
All is well. We have a ClickHouse server, everything works. But sometimes it may be missed.
- The most commonplace when the data on one server did not fit. But this is not the only case.
- For example, I want to speed up by adding iron.
- Or the ClickHouse cluster has become so popular that there are a lot of simultaneous requests, they begin to interfere with each other. And you need to add more iron.
What does ClickHouse offer? Suggests sharding data and using table distributions.
What is it? Sharding is understandable. You have some kind of table. And you put several computers, and on each computer part of this data. In my picture they are in the local_table.
What are distributed tables? This is such a view over local tables, that is, it itself does not store data. It acts as a proxy that will send a request to local tables. It can be created anywhere, even on a separate computer from these shards, but the standard way is to create on each shard. You then come to any shard and create a request.
What is she doing? Here the request went to select from distributed_table. She will take it and rewrite distributed_table to local_table. And then it will send to all shards immediately.
Shards will process request. Moreover, they try to process it to the very end in order to transmit less data over the network. That is, if we have some kind of aggregation, then this aggregation will be partially carried out on shards.They will send the partially aggregated result to the distributed tables. Distributed this table will merge and send the full result to the user.
Here is such a funny benchmark. A little over a billion lines. This is the data for New York taxi rides. They are in the public domain. You can try it yourself.
Count any request. There is an average price depending on the number of passengers. Typical analytical query. On one computer, just over a second. If we deliver three computers, then it will be three times faster. If you have 140 computers, then you can decompose this data into 140 computers and the request will generally be executed quickly, i.e. in a few milliseconds. Of course, this is no longer 140x acceleration, because network latencies are already beginning to play a role there, but anyway. The request can be accelerated to the very end.
How now to lay out data on shards?
The simplest option that we recommend is to take it and manually decompose it into local tables, because a distributed table, when it makes a request, it does not think about how the data is sharded, it just asks all shards at the same time. The main thing is that the data should not be duplicated, otherwise your nonsense will begin in the results. And so you can, as you prefer, lay out everything according to shards.
But, in principle, the distributed table itself can do this, although there are several nuances.
First of all, the entries are asynchronous, that is, if you inserted this table into distributed, it will put the data somewhere in a temporary folder.
And then he will try to embed. To do this, she needs a sharding key. She will count it, divide it into three. There you can create weights. Next, select the shard and insert it there.
What is important here? The sharding key can even be taken randomly. This will work too. The only thing is, if you want to make complex joins and want your data that joins need to be on the same shard, then you need to think about the sharding key already.
We have a ClickHouse cluster. It is big, it works quickly, but sometimes it breaks. Sometimes drives fail, and you don’t want to lose data. And sometimes there are just some kind of network problems, and the report needs to be shown. And data on events that occur constantly, they also can not be lost. That is, accessibility should be both read and write.
ClickHouse offers an asynchronous replication master that works at the table level to solve this problem - ReplicatedMergeTree. On the server, you can have replicated tables, and not replicated.
This is complete chaos. We have several replicas. And here are the same pieces that I talked about in the previous part of the presentation. That is, here the understanding remains the same. These are partially sorted pieces of data. And the replicas are trying to synchronize this set of pieces between themselves.
Three types of events can occur:
- INSERT - insert into replica
- FETCH - one replica downloaded a piece from another
- MERGE - the replica took several pieces and merged them into one
How does the insert happen? Paste on any replica. Here you can see that Replica 1 is not the best, but you can still insert it on it. And the insert information is recorded in ZooKeeper. That is, in order for your replication to work, you will have to install ZooKeeper.
In this case, the full order on the inserts is still supported. You all replicas see the same set of pieces, and they see some holes in it that they don’t have, and they try to fill them with fetch.
Next, we still need to perform merge, that is, merge the pieces. Merge needs to be done in a consistent manner, otherwise the sets of pieces will diverge.To do this, one replica becomes a leader. We will not call it a master, because the association immediately goes with the master, which can only be inserted there, but this is not true. That is, we have Replica 2 - the leader. She decided that these pieces should be held together, recorded it in ZooKeeper, the rest of the replicas about this information will be received and will also do the same merge.
In this case, the replicas constantly compare checksums with each other. If something is wrong, they will discard the piece and download it again, that is, they try to keep the data set byte-identical. This place also needs to be monitored, that is, to monitor how you are replicating, what is the backlog, so God forbid it does not break.
And any discussion of replication rests on the CAP-theorem, that is, you have some place where you can read and write, and you replicate it, then in case of a network failure you need to make a choice: either you continue read and write, or you still need the right data.
But there is no data consistency in ClickHouse like any system with asynchronous replication. Paste the data into one replica, on the second replica after a couple of seconds, maybe it will appear, or maybe after more seconds. There is a green star - good news: you can turn it on, that is, specify the setting when inserting, when reading, and then the reading will be consistent. But, of course, you will pay for it with performance.
Almost there is availability. How to make an indestructible ClickHouse cluster? Take three data centers. ZK in 3 data centers, and replicas, at least in 2. And if your location explodes, then everything continues to work for reading and writing.
Often ask: "And how to do in two data centers?". The two fail because of ZooKeeper. If you have only two locations, then you declare some data center the main one. And if not the main one is turned off, then everything continues to work for you. If you disconnect the main one, then you have read-only.
Why is availability almost? Why a red asterisk? Strictly speaking, there is no full accessibility, because you can’t write to the server if you have it from quorum ZK, that is, if these are three nodes from two nodes, then you can’t write to it, but you can read it. There will be slightly lagging data.
These two features: distributed_table, replicated_table are independent. They can be used independently. But they work very well together. A normal ClickHouse cluster, we imagine it like this. That is, we have N shards and each is triple replicated. And a distributed table knows how to understand that shards are replicas; they can send a request to only one replica of a shard. And it has fault tolerance, that is, if some replica is not available for you, it will go to another.
And one more way to overcome the lack of consistency. You can set some maximum lag. If the distributed table came to the replica, but the lag is too large, it will try another. This cluster will work well.
What is ClickHouse?
- This is a columnar column-oriented database that allows you to perform analytical and interactive queries very quickly.
- The query language is SQL with extensions.
- Poor for OLTP because there are no transactions. Key-Value, because we have a sparse index. If you need one line, then you will read a lot of unnecessary things. And if you have Key-Value with large blob, then this will generally work poorly.
- Scales linearly if you shard and use distributed tables.
- Failover when using replicate tables.
- And all this in open source with a very active community.
Hello! My name is Dmitry. Thanks for the report! There is a question about data duplication. I understand that in ClickHouse there is no way to solve this problem.Does it have to be solved at the insertion stage or are there any methods that we can fight with duplicating data in our database?
Where can duplication of data come from? If you have an inserter that inserts data, fault tolerant, then it does retry there. And when he starts doing retry, for example, bam and ClickHouse disconnected, and he continues to do retry. And at this point, of course, data duplication may occur. But in ClickHouse it does not occur. How do we protect ourselves from this?
Here is the block insert. ZK stores checksums of the last hundred blocks. This is also customizable, but 100 is a good option. Therefore, if you inserted a block and something exploded, and you are not sure whether you inserted it or not, then insert it again. If the insert is successful, ClickHouse will detect this and will not duplicate the data.
T. E. If we insert 10,000 rows each, will we have a million rows stored there that will be guaranteed to be duplicated?
No. Duplication does not work at the row level.
T. E. The piece that we insert is 10,000. Accordingly, we can expect that the last million will not be duplicated if we want to repeat it.
Yes, but only if you expose directly in the same blocks.
T. e. identical blocks, yes?
Yes, checksum is considered for an identical block. If the checksum matches, then the block has already been inserted and no duplication is necessary.
I get it. And the second question. I'm interested in distributed table queries for replicated tables. As I understand it, a request from us goes only to one replica. Is there any way to configure that some heavy queries go to both replicas, so that part of the data from there, some of the data from there somehow get out?
Yes, you can configure it like that. This is another way to use more computers. There is a special setting, you install it. In my opinion, it is called max_parallel_replicas. What she does? It processes half the data on one replica, half on the other. But I must make a reservation right away, this only works if the sample key was specified when creating the table. ClickHouse has such a feature - a sampling key. You can find there a request not for all the data, but for one tenth. And if you have a sampling key set in the replicated table, then when specifying this setting max_parallel_replicas, it will realize that you can calculate one second data there and the other half on the second replica. And will use both.
Will there be more sampling?
If you did not specify a sample when requesting, then there will be no sampling. He just uses it to split the work across the two replicas.
Got it, thanks!
Thanks for the report! I have three questions. You said that the indices are spread, that is, there are 8,000 with something. And you need to write in large volumes. Do you use any kind of pre-buffering? Do you recommend using something?
This is a sore point, because everyone strives to insert one line at a time. In our metrics, for example, we have a very smart butcher who develops individual teams that do a lot of additional work, so ClickHouse doesn't have it.
What is there? There is a buffer table, where you also do not need to insert one line, because it will slow down for some nonsense. But if you insert it there, it drives less on a disk. That is, it will not write to disk for each insert. And so many people who are already more serious about ClickHouse, they use Kafka. In your Kafka lock, your writer takes notes from Kafka and inserts them into ClickHouse. This is also a good option.
Yes, that is, you can manually configure this. One more question. We have a distributed table that manages all shards. And, for example, a shard with a distributed table has died. Does this mean that all of our data has died?
Distributed table does not store anything. If she died with you, then you simply re-create it, and it still works. The main thing is to save the local_tables in which the data lies, so, of course, they need to be replicated.
And the last question. You said that you can manually shard it.Are there any guaranteed transactions that I wrote down there, written down there, and if I didn’t written down there, then cancel it all? I’m writing on three different shards.
If a piece of data has come to you, then you should write it to one shard. Only for one, the main thing is that there is no duplication of data. They recorded it in one place and everything is fine.
T. e. I have to store this, where did I write it?
It is not necessary to store all this because the distributed table asks for all shards. When you have 500 servers, like ours, then this starts to be missed. Because it’s not very good to go to 500 servers at the same time. Therefore, we have two-level sharding there. The second level already knows where he put the data and he pays attention to it. But up to 100 servers, this is enough for the eyes, that is, just go to all the servers and take the result back.
Thank you very much!
How to change data? Is it possible to change something there, for example, remove the whole block and re-upload it so that the whole table is not completely from scratch?
Yes, you can change the whole block, or rather not the block, but the entire partition. Now is the month. But we have a priority to make the partition arbitrary. You take and say “alter table drop partition” and it is removed, and you can backfill the data.
And there are cases when you have some data that needs to be updated. They are and they are slightly changing in real time. For example, there are not hits, but user visits. That is, the user went to one page, to another and his visit length is growing. ClickHouse has an engine for this called CollapsingMergeTree. It is not very convenient to use, but he solves this problem. And when you want to change something, you record two records: the first is to delete the previous record, the second is to record new data. And this table somewhere in the background will optimize. And a little bit you can change the data in this way.
You will need to store this entire string somewhere. Of course, you can go to ClickHouse and ask: "What is the last line for this key?", But it will be slow already.
And a parallel question. Do I understand correctly that if I need replication to two points, and not three, is it easier to put Kafka and write two databases from it and not steam it? That is, when there are two data centers, but there is no third.
We still recommend replicated tables. Why? Because the distributed table is also able to replicate.
How to make replicated on two DCs? I still get that if the master falls, then you can’t write anywhere, but you can only read. Or are there some simple ways? Make slave a master, and then catch up with the first master to slave?
How about Kafka?
With Kafka I will pour each one independently. With Kafka you still have to do three DCs.
Kafka also uses ZK.
It needs less data. But she will only store a couple of days, and not in the entire history, Kafka has fewer resources. But it’s cheaper to do it for three DCs than ClickHouse for three DCs.
ClickHouse doesn't have to be smashed into three, you just need to put the ZK.
And, everything, only for quorum ZK, the data is duplicated only two times. For quorum, we have DCs.
Why do we still recommend a replicated table, and not just stuffing into two tables? Because there are so many checks that the data is exactly the same. If you write this yourself or through a distributed table, it is easy to get some kind of discrepancy. And then it will never be corrected, because they have already written it down and goodbye. And it’s not clear whether it is dispersed or not. And replicated tables are constantly trying to maintain this commonality and a single set of pieces.
My question is about memory utilization. How to distribute it most effectively? How much to allocate for instants?
There is such a story about memory that ClickHouse has such a setting as max_memory_usage. This is how much you can eat when processing the request. What else is memory for? Memory is needed for disk cache. That is, ClickHouse does not have any tricky cache. Some systems how to do? They read o_direct from disk and somehow cache themselves. ClickHouse does not. Some part of the memory (rather large) must be left under the disk cache.Your data that was recently read from the disk will then be read from memory. And a third you assign to the memory that is needed in the request.
Why does ClickHouse consume memory? If you have a streaming request, that is, just walk around and calculate something there, for example, count, then memory units will be consumed.
Where might memory be needed? When you have group by with a lot of keys. For example, you group something by the same referrers and you have a lot of different referrers, urls. And all these keys need to be stored. If you have enough memory, it’s good, but if you don’t have enough, then there is the possibility of group by executing on disk, but it will be much slower.
Will he determine this?
You need to enable it.
Is there any volume that you recommend building? For example, 32 GB per node? That is, when used effectively.
The bigger the better. We have 128 GB.
And one instance uses all 128, right?
Yes, of course, he will use them all. Of course, if your data is not so large, then so much is not needed. How much data, so much memory and take. But if your data doesn’t fit in memory, the more the better.
Alex, thank you for the report! You didn’t measure the performance degradation due to strong fragmentation of the file system?
I can’t give specific figures now. Of course, there is such a problem. When the disks are almost full, the files are no longer located locally, fragmentation already begins. And you just have to make sure that your disks are not very full. And if they start to fill up, then you need to set a new node. I can’t give specific figures.
At least a rough order?
I don’t know, up to 70 percent will be fine.
Good afternoon! I have two questions. As far as I know, ClickHouse now has only an http interface for exchanging data with the client. Is there any kind of roadmap to make a binary interface?
We have two interfaces. This is http that can be used by any http client, which is used in the JDBC driver. And there is a native interface, which we always considered private and did not want to make any kind of library for it. But this interface is not so complicated. And we support forward-backward compatibility there, so kind people used the source as documentation and Go has, I heard, a great driver that native clients use. And for C++, my colleague made a separate driver that allows you to use the native interface and you do not need to link to all ClickHouse to use it. And for other languages, too, probably there. I do not know for sure. That is, formally, we consider it to be our private, but in fact it is already public. Some languages can use it.
Thank you! You said you wrote your data replication system. Suppose Impala uses HDFS to replicate data; it does not do replication on its own. Why did you write replication, why is it better than HDFS?
Interesting question. ClickHouse is developing in layers. At first there were simple merge tables, then replication appeared for them. And here is this scheme with pieces that merges, it just doesn’t lie on HDFS. You could probably use HDFS as a file system and store chunks there, but it’s easier to do this on the local file system.
T. e. you did not directly compare?
We need these merges of pieces to be an integral part of replication too, that is, I think it’s impossible to use any ready-made solution for this.
One piece - this will be one block on HDFS and there will be an operation * opened * , if possible.
T. e. use HDFS as storage?
Yes. In order not to do your own replication. You have recorded on HDFS and consider that it is already replicated, and fault tolerance is supported.
And we want to read from the local disk.
HDFS supports local reading. You can find out on which nodes the data is stored and read run where it is stored.
An interesting thought, you need to think.