image


In 2013, I came to the Mail.ru Group, and I solved the problem in which I needed a queue. There are many different tools for building queues, but I decided to start by finding out what is already in the company. I heard that there is such a product - Tarantool. I found out how it works, and it seemed to me that a queue broker could be built into it perfectly.


I went to the main Tarantool - Kostya Osipov - and tried to explain what I want to get. It was assumed that the queue code would be written in C, like the rest of the Tarantool code, but... The next day, Kostya gave me the script 250 lines, which implemented almost everything I wanted.


From that moment, I fell in love with Tarantool. It turned out that you can write very little code in a very simple scripting language and get completely new functionality for this DBMS.


A lot of time passed, Tarantool developed, including under the influence of our requests, but the basic ideas and approaches remained. I will tell you how to implement your own queue on modern Tarantool, for example, version 2.2.


At that time, I was familiar with several queue implementations, and I liked the simple and fast Beanstalkd. It has a rather convenient interface, monitoring the status of a connection task (client breakage returned the task to the queue), as well as convenient options for working with deferred tasks. When implementing the queue, I wanted to get something like this.


The service itself can be represented as follows: we have a queue broker process that accepts and stores tasks; there are clients: producers who bring tasks (method CDMY0CDMY); and supervisors who take tasks to work (CDMY1CDMY method).


ITKarma picture

The life cycle of one task can be described by the following scheme. The task appears using the CDMY2CDMY method and enters the ready state. Operation CDMY3CDMY translates the task into taken. From taken, the task can be processed (CDMY4CDMY) and deleted, or returned to ready (CDMY5CDMY).


ITKarma picture

We can also expand this diagram and introduce additionally pending task processing:


ITKarma picture

Preparing the environment


Tarantool today is, among other things, the LuaJIT interpreter. To start working with it, you need to create the init.lua start file, the entry point, and register the call CDMY6CDMY there, which launches the DBMS internals.


For local development, it remains only to connect and start the console. Then create this file and run it:


require'strict'.on() box.cfg{} require'console'.start() os.exit() 

The console is interactive, you can immediately do something in it. It is not necessary to install and configure tools for a long time, to understand them. Just write 10-15 lines on any local machine


I also recommend that you enable strict immediately. Lua is fairly loose variable declaration, and this mode should help you a bit with errors. By the way, if you build Tarantool yourself in CDMY7CDMY mode, CDMY8CDMY will be enabled by default.


Next, you just have to run our file using CDMY9CDMY:


tarantool init.lua 

You should see something similar:


2020-07-09 20:00:11.344 [30043] main/102/init.lua C> Tarantool 2.2.3-1-g98ecc909a 2020-07-09 20:00:11.345 [30043] main/102/init.lua C> log level 5 2020-07-09 20:00:11.346 [30043] main/102/init.lua I> mapping 268435456 bytes for memtx tuple arena... 2020-07-09 20:00:11.347 [30043] main/102/init.lua I> mapping 134217728 bytes for vinyl tuple arena... 2020-07-09 20:00:11.370 [30043] main/102/init.lua I> instance uuid 38c59892-263e-42de-875c-8f67539191a3 2020-07-09 20:00:11.371 [30043] main/102/init.lua I> initializing an empty data directory 2020-07-09 20:00:11.408 [30043] main/102/init.lua I> assigned id 1 to replica 38c59892-263e-42de-875c-8f67539191a3 2020-07-09 20:00:11.408 [30043] main/102/init.lua I> cluster uuid 7723bdf4-24e8-4957-bd6c-6ab502a1911c 2020-07-09 20:00:11.425 [30043] snapshot/101/main I> saving snapshot `./00000000000000000000.snap.inprogress' 2020-07-09 20:00:11.437 [30043] snapshot/101/main I> done 2020-07-09 20:00:11.439 [30043] main/102/init.lua I> ready to accept requests 2020-07-09 20:00:11.439 [30043] main/104/checkpoint_daemon I> scheduled next checkpoint for Thu Jul 9 21:11:59 2020 tarantool> 

Writing a queue


Let's create a separate CDMY10CDMY file for writing our application. Of course, you could write everything directly in CDMY11CDMY, but working with a separate file would be more convenient.


Connect CDMY12CDMY as a module from the CDMY13CDMY file:


require'strict'.on() box.cfg{} queue=require 'queue' require'console'.start() os.exit() 

All further modifications we will do in CDMY14CDMY.


Since we are doing the queue, we will need to store task information somewhere.Create a space - a table for the data. You can create it without options, but we'll add something right away. In order to restart normally, we will indicate that the space should be created only if it does not exist (CDMY15CDMY). Also in modern Tarantool, you can specify the format of the fields with the description of the content (it is better to do so). So we will do it.


In turn, let's take a very simple structure. I will only need CDMY16CDMY tasks, their statuses and some arbitrary data. It doesn’t matter to me what will lie there. You cannot work with data without a primary index, so immediately create an index using CDMY17CDMY. Verify that the field type matches both the format and the index.


box.schema.create_space('queue',{ if_not_exists=true; }) box.space.queue:format( { { name='id'; type='number' }, { name='status'; type='string' }, { name='data'; type='*' }, } ); box.space.queue:create_index('primary', { parts={ 1,'number' }; if_not_exists=true; }) 

We will declare the global table CDMY18CDMY, which will carry our functions, attributes and methods. And for starters, we’ll declare two functions: put the task (CDMY19CDMY) and take the task (CDMY20CDMY).


Tasks in the queue will have states. To indicate the status, we have a separate table with the status. You can use numbers or strings as a value, but I like to use single-letter values: they can be selected semantically meaningful and they take up minimal storage space. First, we’ll make two statuses: CDMY21CDMY and CDMY22CDMY.


local queue={} local STATUS={} STATUS.READY='R' STATUS.TAKEN='T' function queue.put(...) end function queue.take(...) end return queue 

How to make CDMY23CDMY? Very simple. We need to generate CDMY24CDMY and insert the data into the space with the status CDMY25CDMY. There are many different ways to generate an identifier, we will take CDMY26CDMY. For the queue, it is good in that the order of messages is automatically determined (but keep in mind that the clock can be tightly translated, in which case the order of tasks may be violated). Also, theoretically, a situation may arise when a task with the same value is already in the queue. Therefore, you can see if there is a problem with such CDMY27CDMY, and in case of a collision add one. It will take microseconds, and this is an extremely unlikely situation, so the performance will not suffer.


We simply insert all the arguments of the function as a filling into our task:


local clock=require 'clock' function gen_id() local new_id repeat new_id=clock.realtime64() until not box.space.queue:get(new_id) return new_id end function queue.put(...) local id=gen_id() return box.space.queue:insert{ id, STATUS.READY, {... } } end 

Once we have written the CDMY28CDMY function, we can restart Tarantool and immediately call this function. We see that the task is queued, it looks like a taple (tuple). You can put arbitrary data and even nested structures into it. The tiles in which Tarantool stores data are packed in MessagePack, which allows you to save such structures.


tarantool> queue.put("hello") --- - [1594325382148311477, 'R', ['hello']]... tarantool> queue.put("my","data",1,2,3) --- - [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]]... tarantool> queue.put({ complex={ struct="data" }}) --- - [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]... 

Everything that we put is in space. You can take space commands and see what lies there.


tarantool> box.space.queue:select() --- - - [1594325382148311477, 'R', ['hello']] - [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]] - [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]... 

Now we need to learn how to take tasks - let's make the CDMY29CDMY function. To do this, work with the status. We take those tasks that are ready for processing, that is, are in the status CDMY30CDMY. We could, of course, go through the primary key and find the first finished task, but under the conditions of load and a large number of processed tasks, this scenario will not work for us. Need a separate index on the status field. One of the main features of Tarantool that distinguishes it from key-value databases is the ability to create various indexes, almost like in relational databases: on different fields, composite, of different types.


Create a second index, in which we indicate that the first field is the status. We’ll search for it. And the second field is CDMY31CDMY. It will sort the ascending tasks within the framework of one status.


box.space.queue:create_index('status', { parts={ 2, 'string', 1, 'number' }; if_not_exists=true; }) 

Take the built-in functions for the selection. There is a special iterator that applies to a space like CDMY32CDMY. We transfer part of the key to it. Here we are faced with a composite index, which consists of two fields: we search by the first, and we arrange by the second. We tell the system to find us tapla, which in the first part of the index are equal to the status of CDMY33CDMY. And we will receive them already ordered by the second part of the index. If we find something, then we take the task, update and return. We are updating to ensure that no one else who comes with the same CDMY34CDMY call takes it. If there are no tasks, then return nothing.


function queue.take() local found=box.space.queue.index.status :pairs({STATUS.READY},{ iterator='EQ' }):nth(1) if found then return box.space.queue :update( {found.id}, {{'=', 2, STATUS.TAKEN }}) end return end 

It is also worth paying attention to the fact that the first level of tapla in Tarantool is presented as an array. It has no names, it is numbered, so for operations such as CDMY35CDMY, until very recently, you had to specify the field number.As an auxiliary element, we will make a plate in which we compare the field name and number. To form this table, we can use the format already described by us:


local F={} for no,def in pairs(box.space.queue:format()) do F[no]=def.name F[def.name]=no end 

For greater clarity, we can correct the description of the indices:


box.space.queue:format( { { name='id'; type='number' }, { name='status'; type='string' }, { name='data'; type='*' }, } ); local F={} for no,def in pairs(box.space.queue:format()) do F[no]=def.name F[def.name]=no end box.space.queue:create_index('primary', { parts={ F.id, 'number' }; if_not_exists=true; }) box.space.queue:create_index('status', { parts={ F.status, 'string', F.id, 'number' }; if_not_exists=true; }) 

Now you can implement the whole CDMY36CDMY:


function queue.take(...) for _,t in box.space.queue.index.status :pairs({ STATUS.READY },{ iterator='EQ' }) do return box.space.queue:update({t.id},{ { '=', F.status, STATUS.TAKEN } }) end return end 

Let's check how this works. Put one task and call CDMY37CDMY twice. If at this point we have data in the space, we can clear it with the CDMY38CDMY command:


tarantool> queue.put("my","data",1,2,3) --- - [1594325927025602515, 'R', ['my', 'data', 1, 2, 3]]... tarantool> queue.take() --- - [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]]... tarantool> queue.take() ---... 

The first CDMY39CDMY returns to us the very task that we set. And when we call CDMY40CDMY again, nothing will return, because there are no more ready-tasks (in the status of CDMY41CDMY). We can verify this by running CDMY42CDMY from space:


tarantool> box.space.queue:select() --- - - [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]]... 

The consumer who takes the task must either confirm its processing or return without processing if for any reason it fails. Then someone else can take on the task. We implement two functions for this: CDMY43CDMY and CDMY44CDMY. They take the CDMY45CDMY task and look for it. If the task status is taken, then we process it. These features are very similar. One deletes processed tasks, the other returns them to the status CDMY46CDMY.


function queue.ack(id) local t=assert(box.space.queue:get{id},"Task not exists") if t and t.status == STATUS.TAKEN then return box.space.queue:delete{t.id} else error("Task not taken") end end function queue.release(id) local t=assert(box.space.queue:get{id},"Task not exists") if t and t.status == STATUS.TAKEN then return box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }}) else error("Task not taken") end end 

Let's see how this works with all four functions. We put two tasks and take the first, then release it. She returns to CDMY47CDMY status. The second call to CDMY48CDMY takes the same task. If we process it, it will be deleted. The third call CDMY49CDMY will take the second task. The order is being followed. If the task is taken, then it is not issued to anyone else.


tarantool> queue.put("task 1") --- - [1594326185712343931, 'R', ['task 1']]... tarantool> queue.put("task 2") --- - [1594326187061434882, 'R', ['task 2']]... tarantool> task=queue.take() return task --- - [1594326185712343931, 'T', ['task 1']]... tarantool> queue.release(task.id) --- - [1594326185712343931, 'R', ['task 1']]... tarantool> task=queue.take() return task --- - [1594326185712343931, 'T', ['task 1']]... tarantool> queue.ack(task.id) --- - [1594326185712343931, 'T', ['task 1']]... tarantool> task=queue.take() return task --- - [1594326187061434882, 'T', ['task 2']]... tarantool> queue.ack(task.id) --- - [1594326187061434882, 'T', ['task 2']]... tarantool> task=queue.take() return task --- - null... 

The result was a correctly working queue. We can already write a consumer who will handle the tasks. But he has at least one problem. When we call CDMY50CDMY, the function immediately returns either a task or an empty string. If you write a task processing cycle and run it, it will work, but idle, doing nothing, just consuming the CPU.


while true do local task=queue.take() if task then --... end end 

To fix this, we need the “channel” primitive (or CDMY51CDMY). It allows you to send messages. In essence, this is a FIFO queue for communication between faybers. We have a fiber that puts tasks when we come to the database over the network or work with it from the console. Our Lua code is executed in the fiber, it needs to inform another fiber, who is waiting for the task, that a new one has arrived through some primitive.


The channel works like this: it can have a buffer for N slots in which you can put a message, even if no one is reading from the channel. You can also create a channel without a buffer capacity, then you can put it only in those slots that someone is waiting for. For example, we create a channel into two buffer elements. It has two slots under CDMY52CDMY. If only one consumer is expected on the channel, he will create a third slot under CDMY53CDMY. If we put messages in this channel, then the three CDMY54CDMY operations will be executed without blocking, and the fourth CDMY55CDMY will block the fiber that it puts in this channel. This allows you to organize interfiber interaction. If suddenly you are familiar with the channels in Go, then there they are actually the same:


ITKarma picture

Slightly redo our function CDMY56CDMY. First, add a new argument - a timeout: we are ready to wait for the task for a certain time. Let's create a loop that will search for a finished task. If he doesn’t find it, he will calculate how much time he has to wait.


Create a channel that will wait with this timeout. If the fiber "sleeps" waiting on the channel, you can wake it from the outside by sending a message to this channel.


local fiber=require 'fiber' queue._wait=fiber.channel() function queue.take(timeout) if not timeout then timeout=0 end local now=fiber.time() local found while not found do found=box.space.queue.index.status :pairs({STATUS.READY},{ iterator='EQ' }):nth(1) if not found then local left=(now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) end 

Total: CDMY57CDMY tries to take the task, if it succeeds, then returns it. But if the task was not found, then you can wait for the remainder of the timeout. Moreover, the other side, which will produce the task, can wake this fiber.


To make it convenient to carry out various tests, we can connect the CDMY58CDMY module globally in the CDMY59CDMY file:


fiber=require 'fiber' 

Let's see how this works without waking up the fiber.In a separate fiber, put the task in 0.1 s. That is, at first the queue is empty, and after 0.1 s. After launch, a task appears. At the same time, we will call CDMY60CDMY with a timeout of 3. After starting CDMY61CDMY will try to find the task. Not finding her, he will fall asleep for 3 s. Then he wakes up, searches again and finds a task.


tarantool> do box.space.queue:truncate() fiber.create(function() fiber.sleep(0.1) queue.put("task 3") end) local start=fiber.time() return queue.take(3), { wait=fiber.time() - start } end --- - [1594326905489650533, 'T', ['task 3']] - wait: 3.0017817020416... 

Now we will make CDMY62CDMY wake up when a task appears. To do this, take the old function CDMY63CDMY and add a message to the channel in it. You can send anything as a message, let CDMY64CDMY be in this case.


Earlier, I showed that CDMY65CDMY can block if there is not enough space in the channel. Moreover, the task producer does not care if there are consumers on the other side or not. It should not be blocked while waiting for the consumer. Therefore, it is logical to set a zero timeout for blocking here. If there are consumers there, that is, those who need to be informed about the new task, we will wake it up. Otherwise, we will not rely on the message in this channel. Or, as an alternative, you can check to see if the channel has active readers.


function queue.put(...) local id=gen_id() if queue._wait:has_readers() then queue._wait:put(true,0) end return box.space.queue:insert{ id, STATUS.READY, {... } } end 

After that, the same CDMY66CDMY code will start working completely differently. We create the task in 0.1 s. and CDMY67CDMY immediately wakes up and receives it. We got rid of the hot cycle that hung continuously in anticipation of the task. If we do not set the task, then the fiber will wait three seconds.


tarantool> do box.space.queue:truncate() fiber.create(function() fiber.sleep(0.1) queue.put("task 4") end) local start=fiber.time() return queue.take(3), { wait=fiber.time() - start } end --- - [1594327004302379957, 'T', ['task 4']] - wait: 0.10164666175842... 

At the moment, we tested the work inside the instance, now we will work on the network. First of all, we need to make our server a server. In the CDMY68CDMY file in CDMY69CDMY, add the CDMY70CDMY option, the port on which it will listen. Along with this, we will need to make permissions. We will not now examine in detail the configuration of privileges, we will make sure that any connection has privileges to execute. For rights, you can read separately .


require'strict'.on() fiber=require 'fiber' box.cfg{ listen='127.0.0.1:3301' } box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists=true }) queue=require 'queue' require'console'.start() os.exit() 

Create a client-CDMY71CDMY to generate tasks. Tarantool already comes with a module that allows you to connect to another Tarantool.


#!/usr/bin/env tarantool if #arg < 1 then error("Need arguments",0) end local netbox=require 'net.box' local conn=netbox.connect('127.0.0.1:3301') local yaml=require 'yaml' local res=conn:call('queue.put',{unpack(arg)}) print(yaml.encode(res)) conn:close() 

$ tarantool producer.lua "hi" --- [1594327270675788959, 'R', ['hi']]... 

The consumer (CDMY72CDMY) will connect, call CDMY73CDMY with a timeout and process the result. If he received a task, then we will print it and release it. We will not process it yet. Let's say the task is received.


#!/usr/bin/env tarantool local netbox=require 'net.box' local conn=netbox.connect('127.0.0.1:3301') local yaml=require 'yaml' while true do local task=conn:call('queue.take', { 1 }) if task then print("Got task: ", yaml.encode(task)) conn:call('queue.release', { task.id }) else print "No more tasks" end end 

But when you try to free the task, we’ll get some bullshit.


$ tarantool consumer.lua Got task: --- [1594327270675788959, 'T', ['hi']]... ER_EXACT_MATCH: Invalid key part count in an exact match (expected 1, got 0) 

Let's figure it out. When we try again to execute the consumer, we find that at the previous start, he took the task, but could not return: he had an error and the task was stuck. No one else can take such tasks, but there is no one to return them, because the code that took them has completed.


$ tarantool consumer.lua No more tasks No more tasks 

With CDMY74CDMY you can see that the tasks are taken.


tarantool> box.space.queue:select() --- - - [1594327004302379957, 'T', ['task 3']] - [1594327270675788959, 'T', ['hi']]... 

There are several problems here, so let's start by automatically releasing tasks when the client disconnects.


Tarantool has triggers for connecting and disconnecting clients. If we add them, we can about the facts of connecting and disconnecting.


local log=require 'log' box.session.on_connect(function() log.info( "connected %s from %s", box.session.id(), box.session.peer() ) end) box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.peer() ) end) 

2020-07-09 20:52:09.107 [32604] main/115/main I> connected 2 from 127.0.0.1:36652 2020-07-09 20:52:10.260 [32604] main/116/main I> disconnected 2 from nil 2020-07-09 20:52:10.823 [32604] main/116/main I> connected 3 from 127.0.0.1:36654 2020-07-09 20:52:11.541 [32604] main/115/main I> disconnected 3 from nil 

There is the concept of CDMY75CDMY, and you can find out which IP the connection was from and the disconnection time. True, there is one caveat. The CDMY76CDMY call essentially calls CDMY77CDMY directly on the socket. Therefore, when disconnecting, we no longer see who is disconnecting (CDMY78CDMY is called on a closed socket). Let's make a small hack. Tarantool has CDMY79CDMY - a temporary table where you can save whatever you want for the duration of the session. During the connection, remember who connected to us in order to know who will disconnect. This makes debugging easier.


box.session.on_connect(function() box.session.storage.peer=box.session.peer() log.info( "connected %s from %s", box.session.id(), box.session.storage.peer ) end) box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) end) 

Now we have a client disconnect event. We need to somehow release the tasks he has taken. We introduce the concept of "possession of the task." The session that took the task should be responsible for it. We will make two plates in which we will save this data, and modify the function CDMY80CDMY.


queue.taken={}; -- список взятых задач queue.bysid={}; -- список задач для конкретной сессии 

function queue.take(timeout) if not timeout then timeout=0 end local now=fiber.time() local found while not found do found=box.space.queue.index.status :pairs({STATUS.READY},{ iterator='EQ' }):nth(1) if not found then local left=(now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end local sid=box.session.id() log.info("Register %s by %s", found.id, sid) queue.taken[ found.id ]=sid queue.bysid[ sid ]=queue.bysid[ sid ] or {} queue.bysid[ sid ][ found.id ]=true return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) end 

We will remember in it that a specific task was taken by a specific session.We will also need to modify the task return code, CDMY81CDMY and CDMY82CDMY. Let's make one general function. We will verify that the task is and taken, and taken by a specific session. Thus, it will not be possible to take a task from one connection, and come from another and say: "delete it, I processed it."


local function get_task( id ) if not id then error("Task id required", 2) end local t=box.space.queue:get{id} if not t then error(string.format( "Task {%s} was not found", id ), 2) end if not queue.taken[id] then error(string.format( "Task %s not taken by anybody", id ), 2) end if queue.taken[id] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", id, queue.taken[id], box.session.id() ), 2) end return t end 

Now the functions CDMY83CDMY and CDMY84CDMY become very simple. We call CDMY85CDMY in them, which checks that the task belongs to us and is taken. And then we are already working with her.


function queue.ack(id) local t=get_task(id) queue.taken[ t.id ]=nil queue.bysid[ box.session.id() ][ t.id ]=nil return box.space.queue:delete{t.id} end function queue.release(id) local t=get_task(id) if queue._wait:has_readers() then queue._wait:put(true,0) end queue.taken[ t.id ]=nil queue.bysid[ box.session.id() ][ t.id ]=nil return box.space.queue :update({t.id},{{'=', F.status, STATUS.READY }}) end 

To reset the status of all tasks in CDMY86CDMY, you can use SQL or the Lua snippet:


box.execute[[ update "queue" set "status"='R' where "status"='T' ]] box.space.queue.index.status:pairs({'T'}):each(function(t) box.space.queue:update({t.id},{{'=',2,'R'}}) end) 

When we call CDMY87CDMY again, he will reply CDMY88CDMY.


$ tarantool consumer.lua Got task: --- [1594327004302379957, 'T', ['task 3']]... ER_PROC_LUA: queue.lua:113: Task id required 

So we find the first problem in our code. When we work inside Tarantool, a tuple is always associated with a space. That has a format, and the format has field names. Therefore, you can use field names in tapla. And when we take it out of the base, the tile becomes just an array with a set of fields. We will finalize the format for returning from functions and we will return not tapla, but objects with names. To do this, we will use the CDMY89CDMY method:


function queue.put(...) ---... return box.space.queue :insert{ id, STATUS.READY, {... } } :tomap{ names_only=true } end function queue.take(timeout) ---... return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) :tomap{ names_only=true } end function queue.ack(id) ---... return box.space.queue:delete{t.id}:tomap{ names_only=true } end function queue.release(id) ---... return box.space.queue :update({t.id},{{'=', F.status, STATUS.READY }}) :tomap{ names_only=true } end return queue 

Changing this, we will face a new problem.


$ tarantool consumer.lua Got task: --- {'status': 'T', 'data': ['hi'], 'id': 1594327270675788959}... ER_PROC_LUA: queue.lua:117: Task 1594327270675788959ULL not taken by anybody 

When trying to free a task, the system will respond, we did not take it. In this case, visually we will see that the ID is the same. Only there is still some kind of suffix ULL.


Here we come across one feature of the LuaJIT extension: FFI (Foreign Function Interface). Let's take a closer look. We’ll put five values ​​in the table, using different options for writing the number CDMY90CDMY as keys.


tarantool> t={} tarantool> t[1]=1 tarantool> t["1"]=2 tarantool> t[1LL]=3 tarantool> t[1ULL]=4 tarantool> t[1ULL]=5 tarantool> t --- - 1: 1 1: 5 1: 4 '1': 2 1: 3... 

We could assume that they rely on CDMY91CDMY (line + number). Maximum as CDMY92CDMY (string, number, LL). But when it is displayed on the screen, it turns out that all the keys are in the table separately: we see all the values ​​CDMY93CDMY, CDMY94CDMY, CDMY95CDMY, CDMY96CDMY, CDMY97CDMY. Moreover, when serializing, we do not see the difference between ordinary, signed and unsigned numbers.


tarantool> return t[1], t['1'], t[1LL], t[1ULL] --- - 1 - 2 - null - null... 

But the most fun comes if you try to get the data from the table. With normal Lua types, everything is fine (CDMY98CDMY and CDMY99CDMY), but with LL (CDMY100CDMY) and ULL (CDMY101CDMY) - no. These types are a separate type of CDMY102CDMY. It is designed to work with types from the C language. And when saved to a Lua table, CDMY103CDMY is hashed by address, not by value. Two, albeit the same in value, numbers just have two different addresses. And when we add the ULL to the table, then we cannot get it from the table for the same value.


Therefore, we will have to redo our turn and key ownership a bit. A forced step, but it allows us to further modify our keys in an arbitrary way. We need to somehow turn our key into a string or number. Take MessagePack. In Tarantool, it is used to store tapla and will pack our values ​​in the same way that Tarantool itself does. With it, we will turn an arbitrary key into a string, which will be the key in our table.


local msgpack=require 'msgpack' local function keypack( key ) return msgpack.encode( key ) end local function keyunpack( data ) return msgpack.decode( data ) end 

Add the key packaging to CDMY104CDMY and save it in the table. In the CDMY105CDMY function, we verify that the key passed in the correct format, and if it is not, we turn it into CDMY106CDMY. After that, we will use the same CDMY107CDMY, which will pack the key in MessagePack. Since this packed key will be required by all the functions that work with it, we will return it from CDMY108CDMY so that CDMY109CDMY and CDMY110CDMY can use it and clean it from sessions.


function queue.take(timeout) if not timeout then timeout=0 end local now=fiber.time() local found while not found do found=box.space.queue.index.status :pairs({STATUS.READY},{ iterator='EQ' }):nth(1) if not found then local left=(now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end local sid=box.session.id() log.info("Register %s by %s", found.id, sid) local key=keypack( found.id ) queue.taken[ key ]=sid queue.bysid[ sid ]=queue.bysid[ sid ] or {} queue.bysid[ sid ][ key ]=true return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) :tomap{ names_only=true } end local function get_task( id ) if not id then error("Task id required", 2) end id=tonumber64(id) local key=keypack(id) local t=box.space.queue:get{id} if not t then error(string.format( "Task {%s} was not found", id ), 2) end if not queue.taken[key] then error(string.format( "Task %s not taken by anybody", id ), 2) end if queue.taken[key] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", id, queue.taken[key], box.session.id() ), 2) end return t, key end function queue.ack(id) local t, key=get_task(id) queue.taken[ key ]=nil queue.bysid[ box.session.id() ][ key ]=nil return box.space.queue:delete{t.id}:tomap{ names_only=true } end function queue.release(id) local t, key=get_task(id) queue.taken[ key ]=nil queue.bysid[ box.session.id() ][ key ]=nil if queue._wait:has_readers() then queue._wait:put(true,0) end return box.space.queue :update({t.id},{{'=', F.status, STATUS.READY }}) :tomap{ names_only=true } end 

Since we have a disconnect trigger, we now know that a specific session that owns some keys has disconnected. You can take all the keys for this session and automatically return them to their original state - CDMY111CDMY. Also, pending CDMY112CDMY may hang inside this session. We’ll leave a marker for them in CDMY113CDMY that you don’t need to take tasks.


box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) box.session.storage.destroyed=true local sid=box.session.id() local bysid=queue.bysid[ sid ] if bysid then while next(bysid) do for key, id in pairs(bysid) do log.info("Autorelease %s by disconnect", id); queue.taken[key]=nil bysid[key]=nil local t=box.space.queue:get(id) if t then if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }}) end end end queue.bysid[ sid ]=nil end end) function queue.take(timeout) if not timeout then timeout=0 end local now=fiber.time() local found while not found do found=box.space.queue.index.status :pairs({STATUS.READY},{ iterator='EQ' }):nth(1) if not found then local left=(now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end if box.session.storage.destroyed then return end local sid=box.session.id() log.info("Register %s by %s", found.id, sid) local key=keypack( found.id ) queue.taken[ key ]=sid queue.bysid[ sid ]=queue.bysid[ sid ] or {} queue.bysid[ sid ][ key ]=found.id return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) :tomap{ names_only=true } end 

For the test, you can take tasks with the command:


tarantoolctl connect 127.0.0.1:3301 <<< 'queue.take()' 

While all this was being debugged, you might encounter the fact that you took tasks, extinguished the queue, started again - the tasks do not belong to anyone (because the connections were broken when turned off), but they are in CDMY114CDMY status. Therefore, we will add a status modification to the code at startup: the database is launched and frees up all the tasks taken.


while true do local t=box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1) if not t then break end box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }}) log.info("Autoreleased %s at start", t.id) end 

The queue was ready for operation.


Add deferred processing


It remains to add pending tasks. To do this, add a new field and an index on it. In this field we will store the time when a certain task needs to be transferred to another state. We modify the function CDMY115CDMY and add a new status: CDMY116CDMY.


box.space.queue:format( { { name='id'; type='number' }, { name='status'; type='string' }, { name='runat'; type='number' }, { name='data'; type='*' }, } ) box.space.queue:create_index('runat', { parts={ F.runat, 'number', F.id, 'number' }; if_not_exists=true; }) STATUS.WAITING='W' 

Since we are fundamentally changing the scheme and this is the development mode, we will clear the previous scheme (we execute it in the console):


box.space.queue.drop() box.snapshot() 

Restart the queue.


In CDMY117CDMY and CDMY118CDMY we add support for CDMY119CDMY. If CDMY120CDMY is transmitted, then we assign the state CDMY121CDMY to the task and determine at what point in time it should be processed. We also need a handler. To do this, we can use background fibers. At any time, you can create a fiber that is not associated with any connection and which will work in the background. Create a fiber that will spin endlessly and wait for the next task.


function queue.put(data, opts) local id=gen_id() local runat=0 local status=STATUS.READY if opts and opts.delay then runat=clock.realtime() + tonumber(opts.delay) status=STATUS.WAITING else if queue._wait:has_readers() then queue._wait:put(true,0) end end return box.space.queue :insert{ id, status, runat, data } :tomap{ names_only=true } end function queue.release(id, opts) local t, key=get_task(id) queue.taken[ key ]=nil queue.bysid[ box.session.id() ][ key ]=nil local runat=0 local status=STATUS.READY if opts and opts.delay then runat=clock.realtime() + tonumber(opts.delay) status=STATUS.WAITING else if queue._wait:has_readers() then queue._wait:put(true,0) end end return box.space.queue :update({t.id},{{ '=', F.status, status },{ '=', F.runat, runat }}) :tomap{ names_only=true } end 

If the time comes for some task, we modify it. We transfer from waiting status to readiness status, also notifying those customers who can wait for the task.


Now we put the task with a delay. We call CDMY122CDMY, there is no ready task. We call again, already with a timeout, which fits into the appearance of the task. As soon as it appears, we see that it is the merit of the CDMY123CDMY fiber.


queue._runat=fiber.create(function() fiber.name('queue.runat') while true do local remaining local now=clock.realtime() for _,t in box.space.queue.index.runat :pairs( { 0 }, { iterator='GT' }) do if t.runat > now then remaining=t.runat - now break else if t.status == STATUS.WAITING then log.info("Runat: W->R %s",t.id) if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({ t.id }, { {'=', F.status, STATUS.READY }, {'=', F.runat, 0 }, }) else log.error("Runat: bad status %s for %s", t.status, t.id) box.space.queue:update({ t.id },{{ '=', F.runat, 0 }}) end end end if not remaining or remaining > 1 then remaining=1 end fiber.sleep(remaining) end end) 

Monitoring


We must not forget about monitoring the queue, because the saddest thing that can happen to your queue: it will become very large. Or even end.


We can quite simply naively and forehead collect statistics from our turn: calculate the number for all statuses that exist and start sending data to monitoring.


function queue.stats() return { total=box.space.queue:len(), ready=box.space.queue.index.status:count({STATUS.READY}), waiting=box.space.queue.index.status:count({STATUS.WAITING}), taken=box.space.queue.index.status:count({STATUS.TAKEN}), } end 

tarantool> queue.stats() --- - ready: 10 taken: 2 waiting: 5 total: 17... tarantool> local clock=require 'clock' local s=clock.time() local r=queue.stats() return r, clock.time() - s --- - ready: 10 taken: 2 waiting: 5 total: 17 - 0.00057339668273926... 

Such monitoring will work quite quickly. Until the moment when there will be a lot of tasks. The normal state of the queue is empty. But suppose something happens and arrives, for example, a million tasks. Our function CDMY124CDMY continues to display the correct value. True, she begins to work quite slowly. The problem with calling CDMY125CDMY is always full-scan by index. Let's cache the counter values.


queue._stats={} for k,v in pairs(STATUS) do queue._stats[v]=0LL end for _,t in box.space.queue:pairs() do queue._stats[ t[F.status] ]=(queue._stats[ t[F.status] ] or 0LL)+1 end function queue.stats() return { total=box.space.queue:len(), ready=queue._stats[ STATUS.READY ], waiting=queue._stats[ STATUS.WAITING ], taken=queue._stats[ STATUS.TAKEN ], } end 

Now this function will start to work very quickly, regardless of the number of records. It remains to update the counters for any operations. With each operation, we must reduce one value and increase another. You can, of course, manually arrange updates by function, but this is fraught with errors and discrepancies. Fortunately, Tarantool has space triggers. A trigger sees any change in space. You can even manually execute CDMY126CDMY or CDMY127CDMY, the trigger will take this into account and calculate. The trigger will take into account all the statuses precisely by the value by which they are stored in the database. When restarting, we will one-time calculate the values ​​of all counters.


box.space.queue:on_replace(function(old,new) if old then queue._stats[ old[ F.status ] ]=queue._stats[ old[ F.status ] ] - 1 end if new then queue._stats[ new[ F.status ] ]=queue._stats[ new[ F.status ] ] + 1 end end) 

There is one more operation that cannot be caught directly in the space, but which affects its contents: CDMY128CDMY. You can track the clearing of the space using a separate trigger in the space - CDMY129CDMY.


box.space._truncate:on_replace(function(old,new) if new.id == box.space.queue.id then for k,v in pairs(queue._stats) do queue._stats[k]=0LL end end end) 

After that, everything starts to work accurately and consistently. And now we can, for example, send statistics over the network. In general, Tarantool has convenient non-blocking sockets. You can work with them quite low-level, almost like in C.


For demonstration, we can send metrics in graphite format via UDP:


local socket=require 'socket' local errno=require 'errno' local graphite_host='127.0.0.1' local graphite_port=2003 local ai=socket.getaddrinfo(graphite_host, graphite_port, 1, { type='SOCK_DGRAM' }) local addr,port for _,info in pairs(ai) do addr,port=info.host,info.port break end if not addr then error("Failed to resolve host") end queue._monitor=fiber.create(function() fiber.name('queue.monitor') fiber.yield() local remote=socket('AF_INET', 'SOCK_DGRAM', 'udp') while true do for k,v in pairs(queue.stats()) do local msg=string.format("queue.stats.%s %s %s\n", k, tonumber(v), math.floor(fiber.time())) local res=remote:sendto(addr, port, msg) if not res then log.error("Failed to send: %s", errno.strerror(errno())) end end fiber.sleep(1) end end) 

or over TCP:


local socket=require 'socket' local errno=require 'errno' local graphite_host='127.0.0.1' local graphite_port=2003 queue._monitor=fiber.create(function() fiber.name('queue.monitor') fiber.yield() while true do local remote=require 'socket'.tcp_connect(graphite_host, graphite_port) if not remote then log.error("Failed to connect to graphite %s",errno.strerror()) fiber.sleep(1) else while true do local data={} for k,v in pairs(queue.stats()) do table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time()))) end data=table.concat(data,'') if not remote:send(data) then log.error("%s",errno.strerror()) break end fiber.sleep(1) end end end end) 

Hot Reload Code


Finally, one cannot fail to mention such an important feature of the Tarantool platform as a hot reload of code. In ordinary applications, this functionality is not so demanded, but when you have a gigabyte database in memory and any restart will cost you startup time, this can serve an excellent service. Let's look at what needs to be done to hot-load the code.


When Lua loads some code through CDMY130CDMY, the contents of this file are interpreted and the returned result is cached in the CDMY131CDMY system table under the module name. Subsequent calls to CDMY132CDMY of the same module will not read the file again, but will return a cached value. To make Lua re-read the file and download it, simply erase the entry from CDMY133CDMY and re-call CDMY134CDMY. Also, we must remember what is preloaded by the runtime itself, because there will be no files to reload the built-in modules. The simplest snippet for processing reload may look something like this:


require'strict'.on() fiber=require 'fiber' box.cfg{ listen='127.0.0.1:3301' } box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists=true }) local not_first_run=rawget(_G,'_NOT_FIRST_RUN') _NOT_FIRST_RUN=true if not_first_run then for k,v in pairs(package.loaded) do if not preloaded[k] then package.loaded[k]=nil end end else preloaded={} for k,v in pairs(package.loaded) do preloaded[k]=true end end queue=require 'queue' require'console'.start() os.exit() 

Since the code reload is a fairly typical and regular task, we already have a ready-made module package.reload that we use in the vast majority of applications. He himself remembers from which file everything was loaded, which modules were preloaded, and provides a convenient call to initiate a reboot: CDMY135CDMY.


require'strict'.on() fiber=require 'fiber' box.cfg{ listen='127.0.0.1:3301' } box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists=true }) require 'package.reload' queue=require 'queue' require'console'.start() os.exit() 

To reload the code, you need to write it a little differently. Keep in mind that code can be executed repeatedly. The first time it is executed at the first boot, the remaining times - at reboot. Accordingly, we need to explicitly handle this situation.


local queue={} local old=rawget(_G,'queue') if old then queue.taken=old.taken queue.bysid=old.bysid queue._triggers=old._triggers queue._stats=old._stats queue._wait=old._wait queue._runch=old._runch queue._runat=old._runat else queue.taken={} queue.bysid={} queue._triggers={} queue._stats={} queue._wait=fiber.channel() queue._runch=fiber.cond() while true do local t=box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1) if not t then break end box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }}) log.info("Autoreleased %s at start", t.id) end for k,v in pairs(STATUS) do queue._stats[v]=0LL end for _,t in box.space.queue:pairs() do queue._stats[ t[F.status] ]=(queue._stats[ t[F.status] ] or 0LL)+1 end log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats }) end 

Also consider reloading triggers. If you leave it as it was, then each reboot will cause the installation of an additional trigger. But the triggers support the indication of the old function, while setting the trigger returns it. Therefore, we simply save the result of the installation into a variable and pass it as an argument. When you start the variable for the first time, there will be no variable and a new trigger will be set. On subsequent downloads, the trigger will be replaced.


queue._triggers.on_replace=box.space.queue:on_replace(function(old,new) if old then queue._stats[ old[ F.status ] ]=queue._stats[ old[ F.status ] ] - 1 end if new then queue._stats[ new[ F.status ] ]=queue._stats[ new[ F.status ] ] + 1 end end, queue._triggers.on_replace) queue._triggers.on_truncate=box.space._truncate:on_replace(function(old,new) if new.id == box.space.queue.id then for k,v in pairs(queue._stats) do queue._stats[k]=0LL end end end, queue._triggers.on_truncate) queue._triggers.on_connect=box.session.on_connect(function() box.session.storage.peer=box.session.peer() log.info( "connected %s from %s", box.session.id(), box.session.storage.peer ) end, queue._triggers.on_connect) queue._triggers.on_disconnect=box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) box.session.storage.destroyed=true local sid=box.session.id() local bysid=queue.bysid[ sid ] if bysid then while next(bysid) do for key, id in pairs(bysid) do log.info("Autorelease %s by disconnect", id); queue.taken[key]=nil bysid[key]=nil local t=box.space.queue:get(id) if t then if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }}) end end end queue.bysid[ sid ]=nil end end, queue._triggers.on_disconnect) 

Another important reboot element is the fiber. Fiber runs in the background, we have no control over it. It says CDMY136CDMY, it will never end and will not reboot by itself. In order to interact with it, we need a channel, or better yet, CDMY137CDMY: condition variable, designed to transmit signals to fiber.


There are several different approaches to reloading fibers. For example, you can destroy old ones by calling CDMY138CDMY, but this approach is not very consistent: we can call CDMY139CDMY at the wrong time. Therefore, in most cases, we use the sign of the generation of the fiber: the fiber continues to work only in the generation in which it was created. When the code is reloaded, the generation changes and the fiber finishes cleanly. We can also protect ourselves from the simultaneous operation of several fibers: for this we can look at the status of the previous generation fiber.


queue._runat=fiber.create(function(queue, gen, old_fiber) fiber.name('queue.runat.'..gen) while package.reload.count == gen and old_fiber and old_fiber:status() ~= 'dead' do log.info("Waiting for old to die") queue._runch:wait(0.1) end log.info("Started...") while package.reload.count == gen do local remaining local now=clock.realtime() for _,t in box.space.queue.index.runat :pairs( {0}, { iterator='GT' }) do if t.runat > now then remaining=t.runat - now break else if t.status == STATUS.WAITING then log.info("Runat: W->R %s",t.id) if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({ t.id }, { { '=', F.status, STATUS.READY }, { '=', F.runat, 0 }, }) else log.error("Runat: bad status %s for %s", t.status, t.id) box.space.queue:update({ t.id },{{ '=', F.runat, 0 }}) end end end if not remaining or remaining > 1 then remaining=1 end queue._runch:wait(remaining) end queue._runch:broadcast() log.info("Finished") end, queue, package.reload.count, queue._runat) queue._runch:broadcast() 

And finally: when you reload the code, you will have an error that the console is already running. You can handle this situation in the following way:


if not fiber.self().storage.console then require'console'.start() os.exit() end 

To summarize


We wrote a working network queue with the possibility of deferred processing, with auto-return of tasks using triggers, with sending statistics to Graphite via TCP, and examined quite a few nuances. On the average modern hardware, such a queue can easily withstand the transmission of 20 thousand messages per second. It consists of approximately 300 lines of code and is written in a day with the study of documentation.


Result Files

CDMY140CDMY:


local clock=require 'clock' local errno=require 'errno' local fiber=require 'fiber' local log=require 'log' local msgpack=require 'msgpack' local socket=require 'socket' box.schema.create_space('queue',{ if_not_exists=true; }) box.space.queue:format( { { name='id'; type='number' }, { name='status'; type='string' }, { name='runat'; type='number' }, { name='data'; type='*' }, } ); local F={} for no,def in pairs(box.space.queue:format()) do F[no]=def.name F[def.name]=no end box.space.queue:create_index('primary', { parts={ F.id, 'number' }; if_not_exists=true; }) box.space.queue:create_index('status', { parts={ F.status, 'string', F.id, 'number' }; if_not_exists=true; }) box.space.queue:create_index('runat', { parts={ F.runat, 'number', F.id, 'number' }; if_not_exists=true; }) local STATUS={} STATUS.READY='R' STATUS.TAKEN='T' STATUS.WAITING='W' local queue={} local old=rawget(_G,'queue') if old then queue.taken=old.taken queue.bysid=old.bysid queue._triggers=old._triggers queue._stats=old._stats queue._wait=old._wait queue._runch=old._runch queue._runat=old._runat else queue.taken={} queue.bysid={} queue._triggers={} queue._stats={} queue._wait=fiber.channel() queue._runch=fiber.cond() while true do local t=box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1) if not t then break end box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }}) log.info("Autoreleased %s at start", t.id) end for k,v in pairs(STATUS) do queue._stats[v]=0LL end for _,t in box.space.queue:pairs() do queue._stats[ t[F.status] ]=(queue._stats[ t[F.status] ] or 0LL)+1 end log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats }) end local function gen_id() local new_id repeat new_id=clock.realtime64() until not box.space.queue:get(new_id) return new_id end local function keypack( key ) return msgpack.encode( key ) end local function keyunpack( data ) return msgpack.decode( data ) end queue._triggers.on_replace=box.space.queue:on_replace(function(old,new) if old then queue._stats[ old[ F.status ] ]=queue._stats[ old[ F.status ] ] - 1 end if new then queue._stats[ new[ F.status ] ]=queue._stats[ new[ F.status ] ] + 1 end end, queue._triggers.on_replace) queue._triggers.on_truncate=box.space._truncate:on_replace(function(old,new) if new.id == box.space.queue.id then for k,v in pairs(queue._stats) do queue._stats[k]=0LL end end end, queue._triggers.on_truncate) queue._triggers.on_connect=box.session.on_connect(function() box.session.storage.peer=box.session.peer() end, queue._triggers.on_connect) queue._triggers.on_disconnect=box.session.on_disconnect(function() box.session.storage.destroyed=true local sid=box.session.id() local bysid=queue.bysid[ sid ] if bysid then log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) while next(bysid) do for key, id in pairs(bysid) do log.info("Autorelease %s by disconnect", id); queue.taken[key]=nil bysid[key]=nil local t=box.space.queue:get(id) if t then if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }}) end end end queue.bysid[ sid ]=nil end end, queue._triggers.on_disconnect) queue._runat=fiber.create(function(queue, gen, old_fiber) fiber.name('queue.runat.'..gen) while package.reload.count == gen and old_fiber and old_fiber:status() ~= 'dead' do log.info("Waiting for old to die") queue._runch:wait(0.1) end log.info("Started...") while package.reload.count == gen do local remaining local now=clock.realtime() for _,t in box.space.queue.index.runat :pairs( {0}, { iterator='GT' }) do if t.runat > now then remaining=t.runat - now break else if t.status == STATUS.WAITING then log.info("Runat: W->R %s",t.id) if queue._wait:has_readers() then queue._wait:put(true,0) end box.space.queue:update({ t.id }, { { '=', F.status, STATUS.READY }, { '=', F.runat, 0 }, }) else log.error("Runat: bad status %s for %s", t.status, t.id) box.space.queue:update({ t.id },{{ '=', F.runat, 0 }}) end end end if not remaining or remaining > 1 then remaining=1 end queue._runch:wait(remaining) end queue._runch:broadcast() log.info("Finished") end, queue, package.reload.count, queue._runat) queue._runch:broadcast() local graphite_host='127.0.0.1' local graphite_port=2003 queue._monitor=fiber.create(function(gen) fiber.name('queue.mon.'..gen) fiber.yield() while package.reload.count == gen do local remote=require 'socket'.tcp_connect(graphite_host, graphite_port) if not remote then log.error("Failed to connect to graphite %s",errno.strerror()) fiber.sleep(1) else while package.reload.count == gen do local data={} for k,v in pairs(queue.stats()) do table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time()))) end data=table.concat(data,'') if not remote:send(data) then log.error("%s",errno.strerror()) break end fiber.sleep(1) end end end end, package.reload.count) function queue.put(data, opts) local id=gen_id() local runat=0 local status=STATUS.READY if opts and opts.delay then runat=clock.realtime() + tonumber(opts.delay) status=STATUS.WAITING else if queue._wait:has_readers() then queue._wait:put(true,0) end end return box.space.queue :insert{ id, status, runat, data } :tomap{ names_only=true } end function queue.take(timeout) if not timeout then timeout=0 end local now=fiber.time() local found while not found do found=box.space.queue.index.status :pairs({STATUS.READY},{ iterator='EQ' }):nth(1) if not found then local left=(now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end if box.session.storage.destroyed then return end local sid=box.session.id() log.info("Register %s by %s", found.id, sid) local key=keypack( found.id ) queue.taken[ key ]=sid queue.bysid[ sid ]=queue.bysid[ sid ] or {} queue.bysid[ sid ][ key ]=found.id return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) :tomap{ names_only=true } end local function get_task( id ) if not id then error("Task id required", 2) end id=tonumber64(id) local key=keypack(id) local t=box.space.queue:get{id} if not t then error(string.format( "Task {%s} was not found", id ), 2) end if not queue.taken[key] then error(string.format( "Task %s not taken by anybody", id ), 2) end if queue.taken[key] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", id, queue.taken[key], box.session.id() ), 2) end return t, key end function queue.ack(id) local t, key=get_task(id) queue.taken[ key ]=nil queue.bysid[ box.session.id() ][ key ]=nil return box.space.queue:delete{t.id}:tomap{ names_only=true } end function queue.release(id, opts) local t, key=get_task(id) queue.taken[ key ]=nil queue.bysid[ box.session.id() ][ key ]=nil local runat=0 local status=STATUS.READY if opts and opts.delay then runat=clock.realtime() + tonumber(opts.delay) status=STATUS.WAITING else if queue._wait:has_readers() then queue._wait:put(true,0) end end return box.space.queue :update({t.id},{{'=', F.status, status },{ '=', F.runat, runat }}) :tomap{ names_only=true } end function queue.stats() return { total=box.space.queue:len(), ready=queue._stats[ STATUS.READY ], waiting=queue._stats[ STATUS.WAITING ], taken=queue._stats[ STATUS.TAKEN ], } end return queue 

CDMY141CDMY:


require'strict'.on() fiber=require 'fiber' require 'package.reload' box.cfg{ listen='127.0.0.1:3301' } box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists=true }) queue=require 'queue' if not fiber.self().storage.console then require'console'.start() os.exit() end 



On July 14th, you can look at this in a case at the Rebrain & amp; Tarantool: Understanding the fail-safe application server - Tarantool. More information and registration by link .

.

Source