MOCHA Open Challenge 2017-2018 – Common API and Implementation Guidelines

Common API

A benchmark for evaluating a SPARQL-based system as well as the system adapter of this system should implement the API that is described in the following. The general API between a benchmark and a benchmark system is already described in our Github wiki (https://github.com/hobbit-project/platform/wiki/Develop-a-system-adapter). The wiki page describes the command queue as well as the three additional RabbitMQ queues a system is connected to, i.e., the data queue, task queue and result queue. (Please have a look at the article before reading on).

A SPARQL based system has four phases during the benchmarking

    1. Initialization phase
      As every system, it has to initialize itself, e.g., start needed components, load configurations etc. This phase ends as soon as it sends the SYSTEM_READY_SIGNAL on the command queue (as described in the wiki and implemented in the AbstractSystemAdapter in https://github.com/hobbit-project/core/blob/master/src/main/java/org/hobbit/core/components/AbstractSystemAdapter.java.
    2. Loading phase
      After the system is running and the benchmark started, it can receive data from the data queue which it should load into its triple store. This can be done as bulk load. The benchmark controller will send a BULK_LOAD_DATA_GEN_FINISHED signal on the command queue when it has finished the sending of all data.  The BULK_LOAD_DATA_GEN_FINISHED message comes with an Integer value (4 bytes) as additional data representing the number of data messages, the system adapter should have been received. This number should be used by the system adapter to wait for all messages to arrive. Note that the benchmark controller might have to collect the number of generated data messages from the data generators. In addition, the BULK_LOAD_DATA_GEN_FINISHED messages contains a flag that determines whether there are more data that have to be sent by the benchmark controller. Such flag is the one that lets the system to enter the querying phase, or let it wait for additional data to come. More specifically, the system will read the remaining data from the data queue, bulk load it into the store and send a BULK_LOADING_DATA_FINISHED signal on the command queue to the benchmark controller to indicate that it has finished the loading. If the flag of BULK_LOAD_DATA_GEN_FINISHED command was false, it waits for the next bunch of data to come, bulk load it into the store and send again BULK_LOADING_DATA_FINISHED signal on the command queue. If the flag is true it can proceed to the Querying phase. The values of the aforementioned  commands are:

      1. BULK_LOADING_DATA_FINISHED = (byte) 150;
      2. BULK_LOAD_DATA_GEN_FINISHED = (byte) 151;
    1. Received data in that time is structured in the following way:
      1. Integer value (4 bytes) containing the length of the graph URI
      2. Graph URI (UTF-8 encoded String)
      3. NTriples (UTF-8 encoded String; the rest of the package/data stream)
    1. Example Workflow:
      1. lastBulkLoad ← false
      2. while lastBulkLoad is false do
      3.      numberOfMessages ← X
      4.      benchmark sends data to system
      5.      if there are no more data for sending then
      6.           lastBulkLoad ← true
      7.      end if
      8.      benchmark sends BULK_LOAD_DATA_GEN_FINISHED { numberOfMessages,lastBulkLoad }
      9.      system loads data
      10.      system sends BULK_LOADING_DATA_FINISHED
      11. done
      12. system enters querying phase
    1. For the benchmarks that measure the time it takes a system to load the data, the times from step 8 to 10 are measured.
    1. Querying phase
      During that phase the system can get two types of input (queries are preceded by its length)

      1. Data from the data query that should be inserted into the store in the form of INSERT SPARQL queries.
      2. Tasks on the task queue, i.e., SPARQL queries (SELECT, INSERT,…), that it has to execute. The results for the single tasks (in JSON format) have to be send together with the id of the task to the result queue.
    2. Termination phase
      As described in the wiki, the third phase ends when the system receives the TASK_GENERATION_FINISHED command and has consumed all the remaining messages from the task queue. (The AbstractSystemAdapter already contains this step.)

Note that not every task has to make use of all of these possibilities. For example, Task 1 does not need the loading phase which means that it can directly send the BULK_LOAD_DATA_GEN_FINISHED signal to the system. Other tasks might not want to insert additional data during the benchmarking, i.e., does not have to use the data queue during the querying phase. Therefore, please refer to the description of each task, noted below.

Guidelines for MOCHA System Adapter Implementation

In this section, we present the guidelines for implementing the System Adapter following MOCHA Common API. The  storage system SystemAdapter class in Java must extend the abstract class AbstractSystemAdapter.java.
A prototype of a SystemAdapter for the open source version of Virtuoso can be found here.

A SystemAdapter must override the following methods:

    • public void init() throws Exception {}: this method is responsible for initializing the storage system, by executing a command that starts the system’s docker container. This function must call super.init().
    • public void receiveCommand(byte command, byte[] data) {}: This method is called if a command in the communication queue is received and must be handled by the System Adapter. In this case, the sender component is the Benchmark Controller and the receiver component is the SystemAdapter. This method is responsible for receiving the BULK_LOAD_DATA_GEN_FINISHED signals from the Benchmark Controller. Such signals are coming every time a new bulk load has to be done. Additionally, the Benchmark Controller will also send the information regarding the number of messages (dataset files) the System Adapter should have received and a flag that determines whether there are more data (more bulk loads) that have to be sent by the benchmark controller. All this information included in the byte[] data argument. The SystemAdapter can read this information using the following commands:
ByteBuffer buffer = ByteBuffer.wrap(data);
int numberOfMessages = buffer.getInt();
boolean lastBulkLoad = buffer.get() != 0;
  • After the receiving of BULK_LOAD_DATA_GEN_FINISHED signal the bulk phase (Phase 2) of the SystemAdapter for the received data is executed. During the bulk phase, the SystemAdapter should:
    1. Wait until all messages from each Data Generator are received.
    2. Once all messages are received, it should load them into the triplestore.
    3. Send the BULK_LOADING_DATA_FINISHED signal to the command queue for the Benchmark Controller to understand that the current bulk phase of the SystemAdapter is over.
  • public void receiveGeneratedData(byte[] data) {}:  this method has two different usage:
      1. In phase 2, it is responsible for receiving the sent data from the data generators and store them locally, in order to be available for loading. Recall from the MOCHA API that in byte[] data included: an integer value containing the length of the graph URI, the UTF-8 encoded String of the graph URI and the data as UTF-8 encoded String. It is suggested that a participant should use the following commands in the body of the receiveGeneratedData function to get all the above information:
    ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    String graphUri = RabbitMQUtils.readString(dataBuffer);
    byte[] dataContentBytes = new byte[dataBuffer.remaining()];
    dataBuffer.get(dataContentBytes, 0, dataBuffer.remaining());
      1. The graph URI is used for determining the name of the file containing the dataset, or a version in which the received data have to be loaded. Finally every time that receiveGeneratedData function is called, the appropriate Atomic Integer variable that holds the number of received messages have to be updated.
      2. In phase 3, It is responsible for receiving the INSERT queries from the data generators. The text of the query can be get in the following way:
    ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    String insertQuery = RabbitMQUtils.readString(dataBuffer);
    • public void receiveGeneratedTask(String tId, byte[] data) {}: this method is responsible for receiving and performing SELECT and INSERT sparql queries against the storage system. The parameter tId contains taskID, as String, for that particular SELECT/INSERT task and the parameter data the SELECT/INSERT query in the form of a byte array. The participant should use the following command to get the SPARQL query string:
ByteBuffer dataBuffer = ByteBuffer.wrap(data);
String queryString = RabbitMQUtils.readString(dataBuffer);
    • For the SELECT query, once it is executed and system response is received, the result set must be converted to JSON format, that abides to w3c standards. If the results of the SELECT query are an instance of the org.apache.jena.query.ResultSet class, then the conversion to JSON can be done using the commands:
ByteArrayOutputStream qBos = new ByteArrayOutputStream();
ResultSetFormatter.outputAsJSON(qBos, rs);
    • Once the results are serialised to JSON format, they must be send to the evaluation storage as byte[] along with the taskID using the following commands:
byte[] results = qBos.toByteArray(); 
sendResultToEvalStorage(tId, results);
    • For INSERT queries, the system adapter must inform the evaluation storage by sending a message to it (e.g. an empty string) along with the taskID using the following commands:
sendResultToEvalStorage(tId, RabbitMQUtils.writeString(""));
  • public void close() throws IOException{}: this method is responsible for shutting down the storage system so it will not receive any further queries. This function must call super.close().

In the following lines, the template for the System Adapter following MOCHA API can be found:

public class VirtuosoSysAda extends AbstractSystemAdapter {
    // a flag indicating if the data loading phase has been finished
    private boolean dataLoadingFinished = false;
    // number of messages received from the data generators
    private AtomicInteger totalReceived = new AtomicInteger(0);
    // number of messages sent by the data generators
    private AtomicInteger totalSent = new AtomicInteger(0);
    // mutex for waiting all the messages from the data generators
    // before loading phase
    private Semaphore allDataReceivedMutex = new Semaphore(0);
    // current loading phase
    private int loadingNumber = 0;

    public VirtuosoSysAda() {
    }

    @Override
    public void receiveGeneratedData(byte[] arg0) {
     if (dataLoadingFinished == false) {
       ByteBuffer dataBuffer = ByteBuffer.wrap(arg0);   
       String fileName = RabbitMQUtils.readString(dataBuffer);
       byte [] content = new byte[dataBuffer.remaining()];
       dataBuffer.get(content, 0, dataBuffer.remaining());
       // Store the file locally for later bulk loading
        …
       // if all the messages are there, release the mutex
       if(totalReceived.incrementAndGet() == totalSent.get()) {
         allDataReceivedMutex.release();
       }
     }
     else {   
       ByteBuffer buffer = ByteBuffer.wrap(arg0);
       String insertQuery = RabbitMQUtils.readString(buffer);
       // proccess the insert query
       … 
     }
    }

    @Override
    public void receiveGeneratedTask(String taskId, byte[] data) {
      ByteBuffer buffer = ByteBuffer.wrap(data);
      String queryString = RabbitMQUtils.readString(buffer);
      if (queryString.contains("INSERT DATA")) {
        // process the insert query and inform the evaluation storage
      … 
      }
      else {
        // process the select query and send the results
        // to the evaluation storage
        … 
      }
    }

    @Override
    public void init() throws Exception {
      super.init();
      // internal initialization
      … 
    }

    @Override
    public void receiveCommand(byte command, byte[] data) {
      if (VirtuosoSystemAdapterConstants.BULK_LOAD_DATA_GEN_FINISHED == command) {
         ByteBuffer buffer = ByteBuffer.wrap(data);
         int numberOfMessages = buffer.getInt();
         boolean lastBulkLoad = buffer.get() != 0;
         // if all data have been received before
         // BULK_LOAD_DATA_GEN_FINISHED command received
         // release before acquire, so it can immediately proceed to
         // bulk loading
         if(totalReceived.get()==totalSent.addAndGet(numberOfMessages)){
            allDataReceivedMutex.release();
       }
       // wait for receiving all data for bulk load
       try {
         allDataReceivedMutex.acquire();
       } catch (InterruptedException e) {
          … 
       }
       // all data for bulk load received.
       // proceed to the loading...
       … 
      loadingNumber++;
      if (lastBulkLoad)
      dataLoadingFinished = true;
     }
     super.receiveCommand(command, data);
    }

    @Override
    public void close() throws IOException {
      // internal close
      …
      super.close();
    }
}