key: cord-0043894-3idyeibv authors: Grall, Arnaud; Minier, Thomas; Skaf-Molli, Hala; Molli, Pascal title: Processing SPARQL Aggregate Queries with Web Preemption date: 2020-05-07 journal: The Semantic Web DOI: 10.1007/978-3-030-49461-2_14 sha: bf1293211caaf89c1bc0b98b903bf875a9d1f8c5 doc_id: 43894 cord_uid: 3idyeibv Executing aggregate queries on the web of data allows to compute useful statistics ranging from the number of properties per class in a dataset to the average life of famous scientists per country. However, processing aggregate queries on public SPARQL endpoints is challenging, mainly due to quotas enforcement that prevents queries to deliver complete results. Existing distributed query engines allow to go beyond quota limitations, but their data transfer and execution times are clearly prohibitive when processing aggregate queries. Following the web preemption model, we define a new preemptable aggregation operator that allows to suspend and resume aggregate queries. Web preemption allows to continue query execution beyond quota limits and server-side aggregation drastically reduces data transfer and execution time of aggregate queries. Experimental results demonstrate that our approach outperforms existing approaches by orders of magnitude in terms of execution time and the amount of transferred data. to deliver final query answers. However, processing aggregate queries with TPF generates tremendous data transfer and delivers poor performance. Recently, the Web preemption approach [12] relies on a preemptable server that suspends queries after a quantum of time and resumes them later. The server supports joins, projections, unions, and some filters operators. However, aggregations are not supported natively by the preemptable server. Consequently, the server transfers all required mappings to the smart client to finally compute groups and aggregation functions locally. As the size of mappings is much larger than the size of the final results, the processing of aggregate queries is inefficient. This approach allows to avoid quotas, but delivers very poor performance for aggregate queries, and could not be a sustainable alternative. In this paper, we propose a novel approach for efficient processing of aggregate queries in the context of web preemption. Thanks to the decomposability of aggregate functions, web preemption allows to compute partial aggregates on the server-side while the smart client combines incrementally partial aggregates to compute final results. The contributions of the paper are the following: (i) We introduce the notion of partial aggregations for web preemption. (ii) We extend the SaGe preemptive server and the SaGe smart client [12] with new algorithms for the evaluation of SPARQL aggregations. The new algorithms use partial aggregations and the decomposability property of aggregation functions. (iii) We compare the performance of our approach with existing approaches used for processing aggregate queries. Experimental results demonstrate that the proposed approach outperforms existing approaches used for processing aggregate queries by orders of magnitude in terms of execution time and the amount of transferred data. This paper is organized as follows. Section 2 reviews related works. Section 3 introduces SPARQL aggregation queries and the web preemption model. Section 4 presents our approach for processing aggregate queries in a preemptive SPARQL server. Section 5 presents experimental results. Finally, conclusions and future work are outlined in Sect. 6. SPARQL Endpoints. SPARQL endpoints follow the SPARQL protocol 1 , which "describes a means for conveying SPARQL queries and updates to a SPARQL processing service and returning the results via HTTP to the entity that requested them". Without quotas, SPARQL endpoints execute queries using a First-Come First-Served (FCFS) execution policy [7] . Thus, by design, they can suffer from convoy effect [5] : one long-running query occupies the server resources and prevents other queries from executing, leading to long waiting time and degraded average completion time for queries. To prevent the convoy effect and ensure a fair sharing of resources among endusers, most SPARQL endpoints configure quotas on their servers. They mainly restrict the arrival rate per IP address and limit the execution time of queries. Restricting the arrival rate allows end-users to retry later, however, limiting the execution time leads some queries to deliver only partial results. Delivering partial results is a serious limitation for public SPARQL services [2, 12, 17] . Centralized Query Answering. Big data processing approaches are able to process aggregate queries efficiently on a large volume of data. Data has to be first ingested in a distributed datastore such as HBase [20] , then SPARQL queries can be translated to Map/reduce jobs or massively parallelized with parallel scans and joins. Many proposals exist in the semantic web including [3, 14] . All these approaches require to download datasets and ingest data on a local cluster to process aggregate queries. Consequently, they require a high-cost infrastructure which can be amortized only if a high number of aggregate queries have to be executed. Our approach processes aggregate queries on available public servers without copying the data and delivers exact answers. Query Answering by Samples. Approximate query processing is a wellknown approach to speedup aggregate query processing [11] . The approach relies on sampling, synopses or sketches techniques to approximate results with bounded errors. The sampling approach proposed in [17] scales with large knowledge graphs, and overcomes quotas but computes approximate query answers. In this paper, we aim to compute the exact results of aggregate queries and not approximate answers. Distributed Query Processing Approaches. Another well-known approach to overcome quotas is to decompose a query into smaller subqueries that can be evaluated under quotas and recombine their results [2] . Such decomposition requires a smart client which allows for performing the decomposition and recombine intermediate results. In that sense, the query processing is distributed between a server and smart client that collaborate to process SPARQL queries. However, ensuring that subqueries can be completed under quotas remains hard [2] . Another approach is to build servers with a restricted interface that processes queries that completes within bounded times, i.e., quotas. A smart client interacts with such a server to process full SPARQL queries. The Triple Pattern Fragments approach (TPF) [19] decomposes SPARQL queries into a sequence of paginated triple pattern queries. As paginated triple patterns queries can be executed in bounded times, the server does not need quotas. However, as the TPF server only processes triple pattern queries, joins and aggregates are evaluated on the smart client. This requires to transfer all required data from server to client to perform joins, and then to compute aggregate functions locally, which leads to poor query execution performance. Web preemption [12] is another approach to process SPARQL queries on a public server without quota enforcement. Web preemption allows the web server to suspend a running SPARQL query after a quantum of time and return a link to the smart client. Sending this link back to the web server, allows executing the query for another quantum of time. Compared to First-Come First-Served (FCFS) scheduling policy, web preemption provides a fair allocation of server resources across queries, a better average query completion time per query and a better time for first results. However, if Web preemption allows processing projections and joins on server-side, aggregate operators are still evaluated on a smart client. So, data transfer may be intensive especially for aggregate queries. In this paper, we extend the web preemption approach to support partial aggregates. Partial aggregates are built during the execution of quanta and sent to the smart client. The smart client recombines partial aggregates to compute the final results. We follow the semantics of aggregation as defined in [10] . We recall briefly definitions related to the proposal of the paper. We follow the notation from [10, 13, 16] and consider three disjoint sets I (IRIs), L (literals) and B (blank nodes) and denote the set T of RDF terms I ∪ L ∪ B. An RDF triple (s, p, o) ∈ (I ∪ B) × I × T connects subject s through predicate p to object o. An RDF graph G (called also RDF dataset) is a finite set of RDF triples. We assume the existence of an infinite set V of variables, disjoint with previous sets. A mapping μ from V to T is a partial function μ : V → T , the domain of μ, denoted dom(μ) is the subset of V where μ is defined. Mappings μ 1 and μ 2 are compatible on the variable ?x, written A SPARQL graph pattern expression P is defined recursively as follows. is a triple pattern. 2. If P 1 and P 2 are graph patterns, then expressions (P1 AND P2), (P1 OPT P2), and (P1 UNION P2) are graph patterns (a conjunction graph pattern, an optional graph pattern, and a union graph pattern, respectively). 3. If P is a graph pattern and R is a SPARQL built-in condition, then the expression (P FILTER R) is a graph pattern (a filter graph pattern). The evaluation of a graph pattern P over an RDF graph G denoted by [[P ]] G produces a multisets of solutions mappings Ω = (S Ω , card Ω ), where S Ω is the base set of mappings and the multiplicity function card Ω which assigns a cardinality to each element of S Ω . For simplicity, we often write μ ∈ Ω instead of μ ∈ S Ω . The SPARQL 1.1 language [18] introduces new features for supporting aggregation queries: i) A collection of aggregate functions for computing values, like COUNT, SUM, MIN, MAX and AVG; ii) GROUP BY and HAVING. HAVING restricts the application of aggregate functions to groups of solutions satisfying certain conditions. Both groups and aggregate deal with lists of expressions E = [E1, . . . , E n ], which evaluate to v-lists: lists of values in T ∪ {error}. More precisely, the evaluation of a list of expressions according to a mapping μ is defined as: . Inspired by [10, 18] , we formalize Group and Aggregate. produces a set of partial functions from keys to solution sequences. To illustrate, consider the query Q 1 of Fig. 1b , which returns the total number of objects per class for subjects connected to the object o 1 through the predicate [12] is the capacity of a web server to suspend a running query after a fixed quantum of time and resume the next waiting query. When suspended, partial results and the state of the suspended query S i are returned to the smart web client 3 . The client can resume query execution by sending S i back to the web server. Compared to a First-Come First-Served (FCFS) scheduling policy, web preemption provides a fair allocation of web server resources across queries, a better average query completion time per query and a better time for first results [1] . To illustrate, consider three SPARQL queries Q a , Q b , and Q c submitted concurrently by three different clients. The execution time of Q a , Q b and Q c are respectively 60 s, 5 s and 5 s. Figure 2a presents a possible execution of these queries with a FCFS policy. In this case, the throughput of FCFS is 3 70 = 0.042 queries per second, the average completion time per query is 60+65+70 3 = 65 s 2 We restrict E to variables, without reducing the expressive power of aggregates [10] . 3 Si can be returned to the client or saved server-side and returned by reference. Figure 2a presents the execution of Q a , Q b , and Q c using Web preemption, with a time quantum of 30 s. Web preemption adds an overhead for the web server to suspend the running query and resume the next waiting query, of about in 3 s (10% of the quantum) our example. In this case, the throughput is 3 76 = 0.039 query per second but the average completion time per query is 76+38+43 3 = 52.3 s and the average time for first results is approximately 30+38+43 3 = 37 s. If the quantum is set to 60 s, then Web preemption is equivalent to FCFS. If the quantum is too low, then the throughput and the average completion time are deteriorated due to overhead. Consequently, the challenges with Web preemption are to bound the preemption overhead in time and space and determine the time quantum to amortize the overhead. To address these challenges, in [12] , the SPARQL operators are divided into two categories: mapping-at-a-time operators and full-mappings operators. For mapping-at-a-time operators, the overhead in time and space for suspending and resuming a query Q is bounded by O(|Q| × log(|G|)), where |Q| is the number of operators required to evaluate Q. Graph patterns composed of AND, UNION, PROJECTION, and most FILTERS can be implemented using mapping-at-atime operators. So, this fragment of SPARQL can be efficiently executed by a preemptable Web server. Full-mappings operators, such as OPTIONAL, GROUP BY, Aggregations, ORDER BY, MINUS and EXISTS require full materialization of solution mappings to be executed, so they are executed by Smart clients. In a more general way, to evaluate [[γ(E, F, P )]] G , the smart client first asks a preemptable web server to evaluate [[P ]] G = Ω, the server transfers incrementally Ω, and finally the client evaluates γ(E, F, Ω) locally. The main problem with this evaluation is that the size of Ω, is usually much bigger than the size of γ(E, F, Ω). Reducing data transfer requires reducing |[[P ]] G | which is impossible without deteriorating answer completeness. Therefore, the only way to reduce data transfer when processing aggregate queries is to process the aggregation on the preemptable server. However, the operator used to evaluate SPARQL aggregation is a full-mapping operator, as it requires to materialize |[[P ]] G |, hence it cannot be suspended and resumed in constant time. Problem Statement: Define a preemptable aggregation operator γ such that the complexity in time and space of suspending and resuming γ is bounded in constant time 4 . Our approach for building a preemptable evaluator for SPARQL aggregations relies on two key ideas: (i) First, web preemption naturally creates a partition of mappings over time. Thanks to the decomposability of aggregation functions [21] , we compute partial aggregation on the partition of mappings on the server side and recombine partial aggregates on the client side. (ii) Second, to control the size of partial aggregates, we can adjust the size of the quantum for aggregate queries. In the following, we present the decomposability property of aggregation functions and how we use this property in the context of web preemption. Traditionally, the decomposability property of aggregation functions [21] ensures the correctness of the distributed computation of aggregation functions [9] . We adapt this property for SPARQL aggregate queries in Definition 3. An aggregation function f is decomposable if for some grouping expressions E and all non-empty multisets of solution mappings Ω 1 and Ω 2 , there exists a (merge) operator , a function h and an aggregation function f 1 such that: In the above, denotes the multi-set union as defined in [10] , abusing notation using Ω 1 Ω 2 instead of P . Table 1 gives the decomposition of all SPARQL aggregations functions, where Id denotes the identity function and ⊕ is the point-wise sum of pairs, i.e., (x 1 , y 1 ) ⊕ (x 2 , y 2 ) = (x 1 + x 2 , y 1 + y 2 ). To Decomposing SUM, COUNT, MIN and MAX is relatively simple, as we need only to merge partial aggregation results to produce the final query results. However, decomposing AVG and aggregations with the DISTINCT modifier are more complex. We introduce two auxiliary aggregations functions, called SaC (SUMand-COUNT ) and CT (Collect), respectively. The first one collects information required to compute an average and the second one collects a set of distinct values. They are defined as follows: SaC(X) = SUM(X), COUNT(X) and CT(X) is the base set of X as defined in Sect Using a preemptive web server, the evaluation of a graph pattern P over G naturally creates a partition of mappings over time ω 1 , ..., ω n , where ω i is produced during the quantum q i . Intuitively, a partial aggregations A i , formalized in Definition 4, is obtained by applying some aggregation functions on a partition of mappings ω i . As a partial aggregation operates on ω i , partial aggregation can be implemented server-side as a mapping-at-a-time operator. Suspending the evaluation of aggregate queries using partial aggregates does not require to materialize intermediate results on the server. Finally, to process the SPARQL aggregation query, the smart client computes [[γ(E, F, P )]] G = h(A 1 A 2 · · · A n ). Figure 3a illustrates how a smart client computes Q 1 over D 1 using partial aggregates. We suppose that Q 1 is executed over six quanta q 1 , . . . , q 6 . At each quantum q i , two new mappings are produced in ω i and the partial aggregate A i = γ({?c}, {COUNT(?o)}, ω i ) is sent to the client. The client merges all A i thanks to the operator and then produces the final results by applying g. Figure 3b describes the execution of Q 2 with partial aggregates under the same conditions. As we can see, the DISTINCT modifier requires to transfer more data, however, a reduction in data transfer is still observable compared with transferring all ω i for q 1 , q 2 , q 3 , q 4 , q 5 , q 6 . The duration of the quantum seriously impacts query processing using partial aggregations. Suppose in Fig. 3a , instead of six quanta of two mappings, the server requires twelve quanta with one mapping each, therefore, partial aggregates are useless. If the server requires two quanta with six mappings each, then only two partial aggregates A 1 = {(:c3, 3), (: c1, 3)} and A 2 = {(:c3, 3), (c2, 3)} are sent to the client and data transfer is reduced. If the quantum is infinite, then the whole aggregation is produced on the server-side, the data transfer is optimal. Globally, for an aggregate query, the larger the quantum is, the smaller the data transfer and execution time are. However, if we consider several aggregates queries running concurrently (as presented in Fig. 2a) , the quantum also determines the average completion time per query, the throughput and time for first results. The time for the first result is not significant for aggregate queries. A large quantum reduces overheads and consequently, improves throughput. However, a large quantum degrades the average completion time per query, i.e., the responsiveness of the server as demonstrated in experiments of [12] . Consequently, setting the quantum mainly determines a trade-off between efficiency of the partial aggregates that can be measured in data transfer and the responsiveness of the server that can be measured in average completion time per query. The administrator of a public server is responsible for setting the value of the quantum according to the workload and dataset size. This is not a new constraint imposed by web preemption, DBpedia and Wikidata administrators already set their quotas to 60 s for the same reason. We offer them the opportunity to replace a quota that stops query execution by a quantum that suspends query execution. For evaluating SPARQL aggregation queries on the preemptive server SaGe [12] , we introduce the preemptable SPARQL aggregation iterator. The new iterator incrementally computes partial aggregation during a time quantum and then returns the results to the smart client, as shown in Algorithm 1. It can also be suspended and resumed in constant time. When query processing starts, the server calls the Open() method to initialize a multiset of solution mappings G. At each call to GetNext(), the iterator pulls a set of solutions μ from its predecessor (Line 7). Then, it computes the aggregation functions on μ and merges the intermediate results with the content of G (Lines 8-13), using the operator. These operations are non-interruptibles, because if they were interrupted by preemption, the iterator could end up in a nonconsistent state that cannot be saved or resumed. The function Merge(K, A, X, Y) Finally, in the case of a CT aggregation (Lines 28-29), the two sets of values are merged using the set union operator. When preemption occurs, the server waits for its non-interruptible section to complete and then suspends query execution. The section can block the program for at most the computation of γ on a single set of mappings, which can be done in constant time. Then, the iterator calls the Save() method and sends all partial SPARQL aggregation results to the client. When the iterator is resumed, it starts back query processing where it was left, but with an empty set G, i.e., the preemptable SPARQL aggregation iterator is fully stateless and resuming it is done in constant time. We also extend the SaGe smart web client to support the evaluation of SPARQL aggregation using partial aggregates, as shown in Algorithm 2. To execute a SPARQL aggregation query Q γ , the client first decomposes Q γ into Q γ to replace the AVG aggregation function and the DISTINCT modifier as described in Sect. 4.2. Then, the client submits Q γ to the SaGe server S, and follows the next links sent by S to fetch and merge all query results, following the Web preemption model (Lines 6-9). The client transforms the set of partial We want to empirically answer the following questions: (i) What is the data transfer reduction obtained with partial aggregations? (ii) What is the speed up obtained with partial aggregations? (iii) What is the impact of time quantum on data transfer and execution time? We implemented the partial aggregator approach as an extension of the SaGe query engine 5 . The SaGe server has been extended with the new operator described in Algorithm 1. The Java SaGe client is implemented using Apache Jena and has been extended with Algorithm 2. All extensions and experimental results are available at https://github.com/folkvir/sage-sparql-void. We build a workload (SP ) of 18 SPARQL aggregation queries extracted from SPORTAL queries [8] (queries without ASK and FIL-TER). Most of the extracted queries have the DISTINCT modifier. SPORTAL queries are challenging as they aim to build VoID description of RDF datasets 6 . In [8] , the authors report that most queries cannot complete over DBpedia due to quota limitations. To study the impact of DISTINCT on performances of aggregate queries processing, we defined a new workload, denoted SP-ND, by removing the DISTINCT modifier from the queries of SP. We run the SP and SP-ND workloads on synthetic and real-world datasets: Berlin SPARQL Benchmark (BSBM) with different sizes, and a fragment of DBpedia v3.5.1, respectively. The statistics of datasets are detailed in Table 2 . We compare the following approaches: -SaGe: We run the SaGe query engine [12] with a time quantum of 150 ms and a maximum page size of results of 5000 mappings. The data are stored in a PostgreSQL server, with indexes on (SPO), (POS) and (OSP). -SaGe-agg: is our extension of SaGe with partial aggregations. It runs with the same configuration as the regular SaGe. -TPF : We run the TPF server [19] (with no Web cache) and the Communica client, using the standard page size of 100 triples. Data are stored in HDT format. -Virtuoso: We run the Virtuoso SPARQL endpoint [6] (v7.2.4) without quotas in order to deliver complete results and optimal data transfer. We also configured Virtuoso with a single thread to fairly compare with other engines. We run experimentations on Google Cloud Platform, on a n1-standard-2: 2 vCPU, 7,5 Go memory with a SSD local disk. Presented results correspond to the average obtained of three successive executions of the queries workloads. (i) Data transfer : is the number of bytes transferred to the client when evaluating a query. (ii) Execution time: is the time between the start of the query and the production of the final results by the client. Data Transfer and Execution Time over BSBM. Figure 4 presents data transfer and execution time for BSBM-10, BSBM-100 and BSBM-1k. The plots on the left detail the results for the SP workload and on the right, the results for the SP-ND workload. Virtuoso with no quota is presented as the optimal in terms of data transfer and execution time. As expected, TPF delivers the worst performance because TPF does not support projections and joins on server-side. Consequently, the data transfer is huge even for small datasets. SaGe delivers better performance than TPF mainly because it supports projection and joins on the server side. SaGe-agg significantly improves data transfer but not execution time. Indeed, partial aggregations allow to reduce data transfer but do not allow to speed up the scanning of data on disk. When comparing the 2 workloads, we can see that processing queries without DISTINCT (on the right) is much more efficient in data transfer than with DISTINCT (on the left). For DISTINCT queries, partial aggregations can only remove duplicates observed during a time quantum only and not those observed during the execution of the query. Impact of Time Quantum. Figure 5 reports the results of running SaGe, SaGe-agg and Virtuoso with a quantum of 150 ms, 1,5 s and 15 s on BSBM-1k. The plots on the left detail the results for the SP workload and on the right the SP-ND workload. As we can see, increasing the quantum significantly improves execution times of SaGe-agg but not of SaGe. Indeed, SaGe transfers the same amount of mappings to the client even with a large quantum. Increasing the quantum reduces data transfer for the SP workload. Indeed, a large quantum allows deduplicating more elements. Data Transfer and Execution Time over DBPedia. Figure 6 reports the results of running SaGe-agg with the SP-ND workload on a fragment of DBPedia with a quantum of 30 s compared with Virtuoso. As expected, Virtuoso delivers better performance in data transfer and execution times. Concerning execution time, the difference of performance between Virtuoso and SaGe-agg is mainly due to the lack of query optimisation in the SaGe-agg implementation: no projection push-down, no merge-joins. Concerning data transfer, Virtuoso computes full aggregation on the server, while SaGe-agg performs only partial aggregation. However, Virtuoso cannot ensure termination of queries under quotas. Five queries are interrupted after 60 s. SaGe-agg replaces a quota that stops query execution by a quantum that suspends query execution. Consequently, SaGe-agg ensures termination of all queries. In this paper, we demonstrated how the partitioning of mappings produced by Web preemption can be used to extend a preemptable SPARQL server with a preemptable aggregation operator. As a large part of aggregations are now executed on the server-side, it drastically reduces data transfer and improves execution time of SPARQL aggregation queries compared to SaGe and TPF. However, in the current implementation, the execution time still exhibits low performance which limit the application to very large knowledge graphs such as Wikidata or DBpedia. Fortunately, there are many ways to improve execution times. First, the current implementation of SaGe has no query optimizer on the server-side. Just applying state of art optimisation techniques, including filter and projection push-down, aggregate push down or merge-joins should greatly improve execution times. Second, web preemption currently does not support intra-query parallelization techniques. Defining how to suspend and resume parallel scans is clearly in our research agenda. Operating Systems: Principles and Practice Strategies for executing federated queries in SPARQL1 LODStats -an extensible framework for high-performance dataset analytics Linked data -the story so far The convoy phenomenon RDF support in the virtuoso DBMS R68-47 computer scheduling methods and their countermeasures SPORTAL: profiling the content of public SPARQL endpoints A survey of distributed data aggregation algorithms Query nesting, assignment, and aggregation in SPARQL 1.1 Approximate query processing: what is new and where to go? SaGe: web preemption for public SPARQL query services Semantics and complexity of SPARQL S2RDF: RDF querying with SPARQL on spark. VLDB Endow Adoption of the linked data best practices in different topical domains Foundations of SPARQL query optimization Anytime large-scale analytics of Linked Open Data SPARQL 1.1 query language Triple pattern fragments: a low-cost knowledge graph interface for the web Hadoop-HBase for large-scale data Eager aggregation and lazy aggregation Acknowledgments. This work is partially supported by the ANR DeKaloG (Decentralized Knowledge Graphs) project, program CE23. A. Grall is funded by the GFI Informatique company. T. Minier is partially funded through the FaBuLA project, part of the AtlanSTIC 2020 program.