key: cord-0462083-x9eax8e1 authors: Pacaci, Anil; Bonifati, Angela; Ozsu, M. Tamer title: Evaluating Complex Queries on Streaming Graphs date: 2021-01-28 journal: nan DOI: nan sha: 8156446f2b96186da13e775b8461e9cda2e45f24 doc_id: 462083 cord_uid: x9eax8e1 We study the problem of evaluating persistent queries over streaming graphs in a principled fashion. These queries need to be evaluated over unbounded and very high speed graph streams. We define a streaming graph data model and query model incorporating navigational queries, subgraph queries and paths as first-class citizens. To support this full-fledged query model we develop a streaming graph algebra that describes the precise semantics of persistent graph queries with their complex constructs. We present transformation rules and describe query formulation and plan generation for persistent graph queries over streaming graphs. Our implementation of a streaming graph query processor shows the feasibility of our approach and allows us to gauge the high performance gains obtained for query processing over streaming graphs. Many modern applications in various domains now operate on very high speed streaming graphs [62] . For example, Twitter's recommendation system ingests 12K events/sec on average [31] , Alibaba transaction graph processes 30K edges/sec at its peak [60] . Efficient querying of these streaming graphs is a crucial task for these applications that monitor complex graph patterns and relationships. Existing graph DBMSs mostly follow the traditional database paradigm where data is persistent and queries are transient; consequently, they do not support persistent query semantics where queries are registered into the system and results are generated incrementally as the graph edges arrive. Persistent queries on streaming graphs enable users to continuously obtain new results on rapidly changing data, supporting online analysis and real-time query processing, the latter being an important functionality of future graph processing engines [64] . This is demonstrated in the following example. Example 1. In many online social networking applications users post original content, sometimes link this to other users' content and react to each other's posts. We say that a user 2 is a recentLiker for another user 1 if 2 has recently liked posts that are created by 1 and 2 , and 1 are following each other. The goal of the recommendation service is to notify users, in real-time, of new content that are posted by others that are connected by a path of recentLiker relationship -these constraints are modeled as a complex graph pattern as the one shown in Figure 1 . The service might provide the context for its recommendations by returning the full paths of people who are recent likers such as the path between users 1 and . This real-time notification task is an example of a persistent query over the streaming graph of user interactions that returns the recommended content in real-time. Real-world applications that feature complex graph patterns as the one shown in the above example require: • (R1) subgraph queries that find matches of a given graph pattern (e.g. in Figure 1 the triangle pattern involving posts, likes and transitive closure of the follows relationship) • (R2) path navigation queries that traverse paths based on user specified constraints (e.g. in Figure 1 arbitrary-length paths of the recentLiker relationship); and • (R3) the ability to treat paths as first-class citizens of the data model, hence to manipulate and return paths (e.g. in Figure 1 the query returns the full paths of recentLiker). Even in the context of one-time queries over static graphs, these requirements are poorly addressed by existing graph DBMSs and their languages. Existing query languages (e.g., PGQL, SPARQL v1.1, Cypher) address the first two issues by replacing edge labels of a subgraph pattern with regular expressions -this is known as unions of conjunctive RPQs (UCRPQ) [14, 72] . However, UCRPQ lacks algebraic closure and composability, limiting optimization opportunities. Furthermore, the output of a path navigation query is typically a set of pairs of vertices that are connected by a path under the constraints of a given regular expression. Hence, UCRPQbased query languages limit path navigation queries to boolean reachability, without the ability to return and manipulate paths. G-CORE [6] addresses these limitations at the language specification level and has influenced the standardization efforts for a query language for graph DBMSs (see https://www.gqlstandards.org/). To the best of our knowledge, there is no work that uniformly addresses all three requirements. The issues become more complex in the context of persistent query processing over streaming graphs, which is the focus of this paper. A number of specialized algorithms focus on evaluating subgraph pattern queries on streaming graphs [5, 18, 35, 44, 60] , and our previous work focuses on path navigation queries and introduces the first streaming algorithms for RPQ evaluation [57] , a limited subset of UCRPQ. However, a general-purpose model and framework that address the above discussed requirements of real-world applications (which feature complex graph patterns) in a uniform and principled manner is currently missing. Querying streaming data in real-time imposes additional and novel requirements: • (R4) graph streams are unbounded, making it infeasible to employ batch algorithms on the entire stream; and • (R5) graph edges arrive at a very high rate and real-time answers are required as the graph emerges. Existing graph DBMSs based on the snapshot model are not able to keep up with the high arrival rates [58] . The unsuitability of existing graph DBMSs for querying streaming data has motivated the design of specialized systems addressing singular application needs (e.g., [18, 31, 35, 44, 60] ). The lack of systematic support for query processing over streaming graphs hinders the development of a general-purpose query processor for streaming graphs. In this paper, we study the design of a general-purpose query processor for streaming graphs that addresses all of the abovediscussed requirements in a uniform and principled manner. In analogy to traditional DBMSs, our framework provides the foundational tools to realize the well-known steps of query processing for streaming graph queries as follows: (1) a streaming graph query expressed in a declarative, highlevel user language is translated into a query plan that consists of logical operators with precise semantics; (2) algebraic transformation rules are used to explore the plan space through query rewrites to find a "good" one, paving the way for query optimization; (3) the execution plan is built by selecting appropriate physical implementations of logical operators that are incremental and non-blocking; (4) the execution engine continuously executes the persistent query upon arrival of new edges to obtain new results. Our focus in this paper is on the following key aspects of this framework: (1) We introduce the formal Streaming Graph Query model (SGQ -Section 4) based on a streaming graph model (Section 3). SGQ is based on the Regular Query (RQ) model 1 , and provide precise semantics for persistent graph queries with subgraph patterns, path navigations and windowing constructs. Most of all, SGQ treats paths as first-class citizens, enabling queries to return and manipulate paths. Moreover, SGQ is closed under transitive closure and composable. (2) We present the Streaming Graph Algebra (SGA -Section 5.1) as a foundational basis for evaluating SGQ and provide an algorithm for translating SGQ into SGA expressions (Section 5.2). Utilizing SGA's closedness (Section 5.3), we introduce transformation rules for novel SGA operators for systematic exploration of the plan space (Section 5.4). (3) We describe query execution plans (Section 6.1) and nonblocking physical operator implementations (Section 6.2). (4) We describe a prototype implementation of a streaming graph query processor based on Timely Dataflow [54] , i.e., the dataflow computational model (Section 6). Unlike existing work on streaming graphs that relies on ad hoc algorithms, our algebraic approach provides the foundational framework to precisely describe the semantics of complex persistent graph queries over streaming graphs and to optimize such queries by query optimizers. SGA unifies path navigation and subgraph pattern queries in a structured manner (R1 & R2), i.e., it attains composability by properly closing UCRPQ under recursion. In addition, our framework treats paths as first-class citizens of its data model, enabling the proposed algebra to express queries that return and manipulate paths (R3). Extensive experimental analysis over real and synthetic datasets of our prototype implementation that incorporates incremental, non-blocking algorithms as physical operators (R4 & R5) demonstrates the feasibility and the performance gains of our approach. To the best of our knowledge, this is the first work to study the design of a streaming graph query processor, and to introduce an end-to-end solution to evaluating streaming graph queries with complex constructs. Early research on stream processing primarily adapt the relational model and its query operators to the streaming setting (e.g., STREAM [8] , Aurora [2] , Borealis [1] ). In contrast, modern Data Stream Processing Systems (DSPS) such as Storm [68] , Heron [40] , Flink [17] do not necessarily offer a full set of DBMS functionality. Existing literature (as surveyed by Hirzel et al. [33] ) heavily focus on general-purpose systems and do not consider core graph querying functionality such as subgraph pattern matching and path navigation. Existing work on streaming graph systems, by and large, target graph analytics workloads with little or no focus on graph querying functionalities on which we focus in this paper (e.g., STINGER [21] , GraphOne [41, 42] , GraphIn [66] , GraphTau [34] , GraPu [67] , GraphBolt [49] ). The primary focus of these systems is to build and maintain a sequence of snapshots for iterative graph analytics workloads under a stream of updates. SPARQL extensions have also been proposed for persistent query processing over RDF streams (e.g., C-SPARQL [12] , CQELS [43] , SPARQL [16] and RSP-QL [20] ). These are the most similar to our setting, but they are designed for SPARQLv1.0 and cannot formulate path expressions such as RPQs that cover more than 99% of all recursive queries found in massive Wikidata query logs [15] . Also, query processors of these systems do not employ incremental operators, except Sparkwave [37] that focuses on stream reasoning. Our proposed framework supports complex graph patterns arising in existing graph query languages, including SPARQL v1.1 property paths, and introduces non-blocking operators optimized for streaming workloads. A persistent query over sliding windows can be formulated as an IVM problem, where the view definition is the query itself and window movements are updates to the underlying database. Although conceptually similar, the algorithms (e.g., the classical Counting [32] algorithm) and systems (e.g., DBToaster [36] , F-IVM [56] , ViewDF [74] ) in this category primarily target nonrecursive queries. General-purpose IVM techniques for recursive queries (such as DRed [32] and Absorption Provenance [48] ) ignore the structure of graph queries and the inherent temporal patterns of streaming graphs, resulting in significant computational overhead to compute and store a large number of derivations. Differential Dataflow (DD) [51] is a state-of-the-art distributed system for incremental maintenance of possibly cyclic dataflows. Graphsurge [63] uses DD as the underlying execution layer to share computation across multiple, possibly recursive views over static graphs. As we show in this paper (Section 6.2), DD can also be used for evaluating SGQ. Our framework, in contrast to these, exploits the structure of graph queries and the temporal patterns of sliding windows to minimize the cost of evaluating complex queries on streaming graphs (Section 7.2.2). TriAL [45] and Temporal Graph Algebra (TGA) [53] adopt an algebraic approach to graph query processing, similar to ours. TriAL is designed to be used for one-time navigational queries over static triplestores, and it cannot be used as a standalone graph query language. In contrast, our proposed SGA is designed as a standalone language for persistent graph queries over streaming graphs, and it can express complex graph patterns expressed in high-level user languages (Section 4.2). TGA extends temporal relational operators to PGM, and its Spark implementation introduces physical operator implementations for its algebraic primitives [4] . However, it is designed for exploratory graph analytics over the entire history of changes. In contrast, our framework is designed to continuously evaluate graph queries as the underlying (potentially unbounded) streaming graph changes. The closest to ours are specialized algorithms on dynamic and streaming graphs [5, 18, 35, 44, 57, 60] . Some of these [18, 35] study the incremental evaluation of subgraph pattern queries. Their focus is developing efficient incremental algorithms to maintain matches of a given subgraph pattern as the underlying graph changes. Ammar et al. [5] present distributed worst-case-optimal join algorithms for subgraph pattern queries. Li et al. [44] study subgraph isomorphism search over streaming graphs with timing-order constraints. GraphS [60] introduces efficient index structures that are optimized for cycle detection queries. In previous work [57] , we study the design space of algorithms for path navigation over streaming graphs and provide algorithms for persistent evaluation of RPQ, a subset of the class of queries that we address in this paper. In contrast we target a general query model that captures the precise semantics of streaming graph queries. where is a set of vertices, is a set of edges, Σ is a set of labels, : → × is an incidence function and : → Σ is an edge labelling function. Definition 2 (Path and Path Label). Given , ∈ , a path from to in graph is a sequence of edges → : ⟨ 1 , · · · , ⟩. The label sequence of a path is defined as the concatenation of edge labels, i.e., ( ) = ( 1 ) · · · ( ) ∈ Σ * . We use T = (T , ≤) to define a discrete, total ordered time domain and use timestamps ∈ T to denote time instants. Without loss of generality, the rest of the paper uses non-negative integers to represent timestamps. Definition 3 (Streaming Graph Edge). A streaming graph edge (sge) is a quadruple ( , , , ) where , ∈ are endpoints of an edge ∈ , ∈ Σ is the label of the edge , and ∈ T is the event (application) timestamp assigned by the source, i.e., ( ) = ( , ) and ( ) = . 2 Definition 4 (Input Graph Stream). An input graph stream is a continuously growing sequence of streaming graph edges = 1 , 2 , · · · where each sge represents an edge in graph and sges are non-decreasingly ordered by their timestamps. 3 Input graph streams represent external sources that provide the system with the graph-structured data. Our proposed framework uses a different format that generalizes Definition 4 to also represent intermediate results and outputs of persistent queries (Definition 7). consisting of all distinct time instants ∈ T for which ≤ < . Timestamps are commonly used to represent the time instant in which the interaction represented by the sge occured [44, 57, 60] . Alternatively, we use intervals to represent the period of validity of sges. In this paper, we argue that using validity intervals leads to a succinct representation and simplifies operator semantics by separating the specification of window constructs from operator implementation. As an example, each sge with timestamp can be assigned a validity interval [ , + 1) that corresponds to a single time unit with smallest granularity that cannot be decomposed into smaller time units. 4 Similarly, an sge = ( , , , [ , )) with a validity interval is equivalent to a set of sges {( , , , 1 ), · · · , ( , , , )} where 1 = and = − 1. Windowing operator (to be precisely defined momentarily in Section 5.1) are used to assign validity intervals based on the windowing specifications of a given query. We now describe the logical representation of streaming graphs that is used throughout the paper. First, we extend the directed labeled graph model with materialized paths to represent paths as first-class citizens of the data model. As per Definition 2, a path between vertices and is a sequence of edges → : ⟨ 1 , · · · , ⟩ that connects vertices and , i.e., the path defines a higher-order relationship between vertices and through a sequence of edges. By treating paths as first-class citizens like vertices and edges, the materialized path graph model enables queries to have paths as inputs and outputs. In addition, it enables edges and paths to be stitched together to form complex graph patterns as will be shown in Section 5.1. Definition 6 (Materialized Path Graph). A materialized path graph is a 7-tuple = ( , , , Σ, , , ) where is a set of vertices, is a set of edges, is a set of paths, Σ is a set of labels, : → × is an incidence function, : → × · · · × is a total function that 2 We assume that sges are generated by a single source and arrive in order, and leave out-of-order arrival as future work. 3 We use to denote ordered streams throughout the paper 4 Commonly referred as NOW windows as described in Section 5.1. The function assigns to each : → ∈ an actual path ⟨ 1 , · · · , ⟩ in graph satisfying: for every ∈ [1, · · · , ), ( ) = ( , ), = +1 , and 1 = , = . Materialized path graph is a strict generalization of the directed labeled graph model (Definition 1), i.e., each directed labeled graph is also a materialized path graph where = ∅. We now generalize the notion of streaming graph edges (Definition 3) as follows: ∈ are endpoints of an edge ∈ or a path ∈ in graph and ∈ Σ is its label, and nondistinguished (implicit) attributes [ , ) ∈ T × T is a half-open time-interval representing 's validity and D is a payload consists of edges in that participated in the generation of the tuple . Streaming graph tuples generalize sges (Definition 3) to represent, in addition to input graph edges, derived edges (new edges as operator and query results that are not necessarily part of the input graph) and paths (sequence of edges as operator and query results). We use the notation ⊂ to denote the set of input graph edges, and ( ) to denote the fixed set of labels that are reserved for input graph edges. Additionally, non-distinguished (implicit) attribute D of an sgt captures the path , i.e., sequence of edges, in case the sgt represents a path. Otherwise, D is the edge that the sgt represents. Definition 8 (Streaming Graph). A streaming graph is a continuously growing sequence of streaming graph tuples = 1 , 2 , · · · in which each tuple arrives at a particular time ( < for < ). Figure 3 (validity intervals are assigned by WSCAN operator -see Section 5.1). Unless otherwise specified, we consider streaming graphs to be append-only, i.e., each sgt represents an insertion, and use the direct approach to process expirations due to window movements. Explicit deletions of previously arrived sgts can be supported by explicitly manipulating the validity interval of a previously arrived [7, 31) u v [10, 34) v b [13, 37) y u [17, 41) v c [22, 46) u a [28, 52) y a [29, 53) u b [30, 54) sgt [39] . This corresponds to the negative tuple approach [25, 29] . Section 6.2 describes the processing of insertions, deletions and expirations under alternative window semantics for physical operator implementations. Definition 9 (Logical Partitioning). A logical partitioning of a streaming graph is a label-based partitioning of its tuples and it produces a set of disjoint streaming graphs { 1 , · · · , } where each consists of sgts of with the label , i.e., = ∈Σ ( ) This label-based partitioning of streaming graphs provides a coherent representation for inputs and outputs of operators in logical algebra (Section 5.1). At the logical level, it can be performed by the filter operator of the logical algebra (precisely defined in Definition 17), and logical operators of our algebra process logically partitioned streaming graphs as their inputs and outputs unless otherwise specified. Definition 10 (Value-Eqivalence). 2 ), D 2 ) are valueequivalent iff their distinguished attributes are equal, i.e., they both represent the same edge or the same path possibly with different validity intervals. Formally, Value-equivalence is used for temporal coalescing of tuples with adjacent or overlapping validity intervals [47] . We extend the coalesce primitive from temporal database literature [19] to sgts with an aggregation function over the non-distinguished payload attribute, D, as shown below: Definition 11 (Coalesce Primitive). The coalesce primitive merges a set of value-equivalent sgts { 1 , · · · , }, = ( , , , [ , ), D ) for 1 ≤ ≤ with overlapping or adjacent validity intervals using an operator-specific aggregation function over the payload attribute D: Distinguished attributes , and the label of sgts in a streaming graph represent the topology of a materialized path graph. Hence, a finite subset of a streaming graph corresponds to a materialized path graph over the set of edges and paths that are in the streaming graph and the set of vertices that are adjacent to these. We now use this to define snapshot graphs and the property of snapshot reducibility. Definition 12 (Snapshot Graph). A snapshot graph of a streaming graph is defined by a mapping from each time instant in T to a finite set of sgts in . At any given time , the content of a mapping ( ) defines a snapshot graph } is the set of all edges that are valid at time , }, and is the set of all vertices that are endpoints of edges and paths in and , respectively. Value-equivalence (Definition 10) and the coalesce primitive (Definition 11) ensure that snapshot graphs have the set semantics, i.e., at any point in time , the snapshot graph of a streaming graph , a vertex, edge and path exists at most once. This section presents our streaming graph query (SGQ) model. We first provide a formal definition of SGQ using Datalog, enabling the specification of precise SGQ semantics and to reason about its expressiveness. We then describe how SGQ captures a significant subset of existing graph query languages and provide concrete examples on how to formulate SGQ using a slight extension of G-CORE. We formally describe SGQ based on a streaming generalization of the Regular Query (RQ) model [61] . Informally, RQ corresponds to binary, non-recursive subset of Datalog with transitive closure and provides a principled way to combine subgraph patterns and path navigations. RQ provides a good basis for building a generalpurpose framework for persistent query evaluation over streaming graphs, because (i) unlike UCRPQ, it is closed under transitive closure and therefore composable, (ii) it has more expressive power than the existing graph query languages such as SPARQL v1.1, Cypher, PGQL -RQ strictly subsumes UCRPQ on which these are based, and (iii) its query evaluation and containment complexity is reasonable [61] . Due to its well-defined semantics and computational behaviour, RQ has been gaining popularity as a logical foundation for graph queries, both in theory [13, 14] and in practice [6] . Indeed, RQ captures the core of the contemporary graph query language G-CORE that we use throughout this paper. Definition 13 (Regular Queries (RQ) -Following [61] ). The class of Regular Queries is the subset of non-recursive Datalog with a Q Q Figure 5 : Snapshot reducibility (adapted from [39] ). finite set of rules where each rule is of the form: 5 which is a transitive closure over ( , ) for a label ∈ Σ, ∈ Σ \ ( ), and each head predicate (ℎ ) is a binary predicate with ( , ) for a label ∈ Σ \ ( ) except the reserved predicate ∉ Σ of an arbitrary arity. In other words, an RQ is a binary, non-recursive Datalog program extended with the transitive closure of binary predicates where input graph edges with a label ∈ ( ) correspond to instances of the extensional schema (EDB) and derived edges and paths with a label ∈ Σ\ ( ) correspond to instances of the intensional schema (IDB). EDBs are predicates that appear only on the right-hand-side of the rules, which correspond to stored relations in Datalog [3] . Similarly, we define IDBs as predicates that appear in the rule heads, which correspond to output relations in Datalog. Example 2 (Regular Query). Consider the real-time notification query in Example 1 and its graph pattern in Figure 1 . The one-time query for the same graph pattern corresponds to the following RQ: where predicates , , , , , represent labels likes, follows, fol-lowsPath, post, recentLiker and recentLikerPath, respectively. Next, we define the notion of snapshot reducibility that enables us to precisely define the semantics of streaming queries and operators using their non-streaming counterparts. Snapshot reducibility is used in temporal databases to generalize non-temporal queries and operators to temporal ones [19] . Definition 14 (Snapshot-Reducibility). Let be a streaming graph, Q a streaming graph query and Q its non-streaming, onetime counterpart. Snapshot reducibility states that each snapshot of the result of applying Q to is equal to the result of applying its non-streaming version Q over the corresponding snapshots of , i.e., ∀ ∈ T , Q ( ) = Q ( ) . Following existing research [27] , we define the semantics of persistent evaluation of SGQ using the notion of snapshot reducibility (Definition 14). It is known that for many operations such as joins 5 The dependency graph of a Datalog program is a directed graph whose vertices are its predicates and edges represent dependencies between predicates, i.e., there is an edge from to if appears in the body of rule with head predicate . A Datalog program is non-recursive iff its dependency graph is acyclic, i.e., no predicate depends recursively on itself. and aggregation, exact results cannot be computed with a finite memory over unbounded streams [9] . In streaming systems, a common solution for bounding the space requirement is to evaluate queries on a window of data from the stream. The windowed evaluation model provides a tool to process unbounded streams with bounded memory, and restricts the scope of queries to recent data, a desired feature in many applications [9, 27] . Additionally, as opposed to streaming approximation techniques that trade off exact answers in favour of bounding the space requirements, windowbased query evaluation enables exact query answers w.r.t. window specifications. Hence, we adopt the time-based sliding window for SGQ in the remainder. Definition 15 (Streaming Graph Query -SGQ). An SGQ query is an RQ defined over a streaming graph and a time-based sliding window W T whose semantics is defined using the corresponding, one-time RQ and the notion of snapshot reducibility (Definition 14): Figure 5 illustrates the correspondence between streaming and one-time graph queries. A direct consequence of such a relationship is that SGQ can be evaluated by repeatedly executing the corresponding one-time query, known as query re-evaluation [25] . Specifically, the resulting streaming graph of an SGQ can be ob- . However, such a strategy is wasteful as the input differences between two consecutive instants are likely to be small. Alternatively, incremental evaluation computes the changes in the output as new sgts arrive and old sgts expire due to window movements. In this paper, we focus on the incremental evaluation method and use the concept of snapshot reducibility to ensure correct evaluation semantics. The SGQ model formalizes an important class of graph queries in the streaming model. It captures the core features of current graph query languages such as subgraph pattern and reachability-based path queries. In this section, we demonstrate the SGQ's expressive power by mapping core G-CORE constructs to SGQ. We choose G-CORE for demonstration due to the following reasons. G-CORE fulfills all three requirements of graph querying (R1, R2 & R3 in Section 1). Other existing languages (e.g., SPARQL v1.1, Cypher, PGQL) can only partially satisfy these requirements due to (i) the lack of algebraic closure and composability, and (ii) limited path navigation capability [14] . Moreover, G-CORE supports SGQ capabilities such as the treatment of paths as first-class citizens and returning graphs. Finally, G-CORE is one of the more prominent language specifications influencing the ongoing standardization process of a graph query language GQL (https://www.gqlstandards.org). G-CORE is originally targeted for one-time queries over static property graphs and it does not provide native windowing constructs. We slightly extend the ON clause with a WINDOW clause to incorporate window specifications. In particular, a time-based sliding window is defined by the newly introduced WINDOW clause that specifies the window length, and an optional SLIDE clause that specifies the slide interval, following a streaming graph reference in the ON clause. Example 3. The G-CORE query in Figure 6 represents the real-time notification example in Example 1 (its corresponding RQ is already given in Example 2). Its PATH and MATCH clauses use ASCII-art syntax (b) to define complex graph patterns (f) with RPQ-based reachability (e), and its CONSTRUCT clause returns a streaming graph of notify edges (a). Figure 7 that combines streaming information from a social network of user interactions and a transaction network of customer purchases to drive product recommendations. Its defines a view over the resulting streaming graph of recommendation edges (d) by joining patterns from two streaming graphs (c), and its MATCH clause features optional predicates to incorporate two alternative social interactions (d). Its graph pattern corresponds to the following RQ: where predicates , , , , , represent labels likes, follows, post, purchase, acquaintance, and recommendation, respectively. This section presents the logical foundation of our streaming graph query processing framework. We first introduce streaming graph algebra (SGA) and the semantics of its operators (Section 5.1). The main motivation behind the proposed SGA is similar to that of relational DBMSs: it enables us to formulate and represent query plans independent of specific physical implementations. We subsequently describe how to transform a given SGQ (Definition 15) into its canonical SGA expression and illustrate logical query plans (Section 5.2) and prove its closedness and composability (Section 5.3). Finally, we present some transformation rules for SGA to demonstrate alternative logical plan generation opportunities (Section 5.4). For ease of exposition, the rest of the paper assumes that inputs to each SGA operator are partitioned into one more streaming graphs based on tuple labels where each contains sgts with the same label ∈ Σ. The output of each operator is also a streaming graph where each sgt has the label ∈ Σ \ ( ). 6 The window size T determines the length of the validity interval of sgts and the slide interval controls the granularity at which the time-based sliding window progresses [8, 57] . If is not provided, default is = 1, i.e., single time instant with the smallest granularity, and it defines a sliding window that progresses at every time instant. The WSCAN operator defines the semantics of time-based sliding windows. It acts as an interface between the external streaming graph sources and the query plans and it is responsible for providing data from input graph streams to a query plan, similar to the scan operator in relational systems. WSCAN manipulates the implicit temporal attribute and associates a time interval to each sgt representing its validity. Our model of representing streaming graphs (Definition 8) provides a concise representation of validity intervals and enables operators to treat time differently than the data stored in the graph. This enables us to distinguish operator semantics from window semantics and eliminates the redundancy caused by integrating sliding window constructs into each operator of the algebra. SGA operators access and manipulate validity intervals implicitly, generalizing their non-streaming counterparts with implicit handling of time. Example 5. Consider the real-time notification task of Example 1 with a 24-hour window of interest. WSCAN W 24 sets validity intervals of sges of the input graph stream and produces a streaming graph where each sgt is valid for 24 hours, as shown in Figure 3 . Definition 17 (FILTER). Filter operator Φ ( ) is defined over a streaming graph and a boolean predicate Φ involving the distinguished attributes of sgts, and its output stream consists of sgts of on which Φ evaluates to true. Formally: Definition 18 (UNION). Union ∪ [ ] with an optional output label ∈ Σ \ ( ) merges sgts of two streaming graphs 1 and 2 , and assigns the new label if provided. Formally: are endpoints of sgts in , and , ∈ { 1 , 1 , · · · , , } are the endpoints of resulting sgts, and ∈ Σ \ ( ) represent the label of the resulting sgts. Formally: Given a subgraph pattern expressed as a conjunctive query, PAT-TERN finds a mapping from vertices in the stream to free variables where (i) all query predicates hold over the mapping, and (ii) there exists a time instant at which each edge in the mapping is valid. . Its output over the streaming graph, given in Figure 3 , consists of sgts ( , , , [28, 37) , ( , , )) and ( , , , [29, 31) , ( , , )) that correspond to derived edges with label recentLiker. SGA operators may produce multiple value-equivalent sgts with adjacent or overlapping validity intervals. Unless otherwise specified, such sgts in resulting streaming graphs of SGA operators are coalesced to maintain the set semantics of streaming graphs and their snapshots (Definition 10). To illustrate, consider PATTERN in the above example: over the streaming graph given in Figure 3 , the PATTERN operator finds two distinct subgraphs with vertices ( , , ) and ( , , ). Consequently, it produces two value-equivalent tuples ( , , , [29, 31) , ( , , )) and ( , , , [30, 31) , ( , , )), which are coalesced into a single sgt by merging their validity intervals. Definition 20 (PATH). The streaming path navigation operator is defined as P ( 1 , · · · , ) where is a regular expression over the alphabet { 1 , · · · , } ⊆ Σ, and ∈ Σ \ ( ) designates the label of the resulting sgts. The sgt = ( , , , [ , ) , D : ) is an answer for P if there exists a path between and in the snapshot of at time , i.e., : → ∈ ( ) = , and the label sequence of the path , ( ) is a word in the regular language ( ). Formally: PATH finds pairs of vertices that are connected by a path where (i) each edge in the path is valid at the same time instant, and (ii) path label is a word in the regular language defined by the query. This closely follows the RPQ model where path constraints are expressed using a regular expression over the set of labels [72] . Path navigation queries in the RPQ model are evaluated under arbitrary and simple path semantics. The former allows a path to traverse the same vertex multiple times, whereas under the latter semantics a path cannot traverse the same vertex more than once [7, 10, 72] . In this paper, we adopt the arbitrary path semantics due to its widespread adoption in modern graph query languages [6, 7, 70] , and the tractability of the corresponding evaluation problem [10] . Most existing work on the RPQ model focuses on the problem of determining reachability between pairs of vertices connected by a path conforming to given regular expression [38, 46, 52, 57] . By adapting the materialized path graph model (Definition 6), we pinpoint that PATH is equipped with the ability to return paths, i.e., each resulting sgt contains the actual sequence of edges that form the path with a label sequence conforming to given regular expression. SGA builds on the Regular Property Graph Algebra (RPGA) [14] , which is itself based on Regular Queries (RQ). Of course, both RPGA and RQ formulate graph queries over static graphs, while SGA operators are defined over streaming graphs (Definition 8), and they access and manipulate validity intervals implicitly. Thus they generalize their non-streaming counterparts with implicit handling of time. This follows from the fact that the semantics of SGA operators are defined through snapshot reducibility (Definition 14), that is, the snapshot of the result of a streaming operator on a streaming graph at time is equal to the result of the corresponding non-streaming operator on the snapshot of the streaming graph at time . SGA can express all queries that can be specified by SGQ (Section 4). This section provides an algorithm for the conversion. Given a SGQ ( , W T ) over a streaming graph and a time-based sliding window definition, Algorithm SGQParser produces the canonical SGA expression. The algorithm processes the predicates of a given SGQ and generates the corresponding SGA expression in a bottom-up manner: each EDB corresponds to a WSCAN over an input streaming graph , each application of transitive closure corresponds to a PATH, each IDB corresponds to a UNION or PATTERN based on the body of the corresponding rule. Proof. The dependency graph of an RQ is acyclic as RQ is nonrecursive (Definition 13); hence, Line 2 is guaranteed to define a partial order over 's predicates. Algorithm SGQParser generates an SGA expression for each predicate in this order and caches it in array. In particular, Line 7 generates an SGA expression for each EDB predicate and Line 9 generates a PATH expression for each body predicate with a Kleene star. For each rule ( , ) := 1 ( 1 , 1 ), · · · , ( , ), Line 13 generates a PATTERN expression. Finally, Line 14 generates a UNION expression if there are multiple rules with the same head predicate . As each predicate is processed based on the partial order defined by the dependency graph (Line 4), is guaranteed to have SGA expressions for each predicate (1 ≤ ≤ ) when processing predicate . Once all predicates are processed, Line 18 returns the SGA expression of the predicate. Hence, Algorithm SGQParser correctly constructs an SGA expression for a given SGQ. □ The complexity of evaluating SGA expressions is the same as RQ given their relationship noted above: NP-complete in combined complexity and NLogspace-complete in data complexity [14, 61] . Example 8 (Canonical Translation). For the real-time notification task in Example 1 and its corresponding RQ in Example 2, Algorithm SGQParser generates the following canonical SGA expression for its corresponding SGQ with a sliding window of 24 hours: Algebraic closure is a required property of any query algebra as it enables query rewriting (Section 5.4) and query optimization. Composability is a desired feature for a declarative query language as it facilitates query decomposition, view-based query evaluation, query rewriting etc. SGA operators are closed over streaming graphs as defined in Section 3; that is, the output of an SGA operator is a valid streaming graph if its inputs are valid streaming graphs. Thus SGA queries are composable, i.e., the output of one query can be used as input of another query. SGQ language is also closed (Theorem 1) -each query takes one or more streaming graphs as input and produces a streaming graph as output. It is also composable as the output of a query can be the input of the subsequent query. As such, G-CORE variation that we use as our user-level query language example (Section 4.2) attains composability exactly as its original version is composable over property graphs [6] . This is in contrast to the other graph query languages that lack an algebraic basis like ours, e.g., SPARQL and Cypher are not composable and may not be closed. Cypher 9 requires graphs as input, but produces tables as output so the language is neither closed nor composable -the results of a Cypher query cannot be used as input to a subsequent one without additional processing. SPARQL can produce graphs as output using the CONSTRUCT clause, and is therefore closed; however, it requires query results to be made persistent and therefore not easily composable [14] . As noted above, closedness of an algebra is important for query rewriting to explore the space of equivalent plans; this is a key component of query planning and optimization. Although optimization problems are beyond the scope of this paper, in this section we highlight possible transformation rules that enable the systematic exploration of the plan space in order to demonstrate the potential of our SGA. Some of the traditional relational transformation strategies such as join ordering, predicate push down are applicable to UNION, FIL-TER and PATTERN due to snapshot-reducibility. UNION and FILTER operators are streaming generalizations of corresponding relational union and selection operators, and the PATTERN operator can be represented using a series of equijoins. Below, we describe transformation rules involving the other novel SGA operators: Transformation Rules for WSCAN: WSCAN (W T ) commutes with operators that do not alter the validity intervals of sgts, i.e., UNION and FILTER. Pushing FILTER down the WSCAN operator can potentially reduce the rate of sgts and consequently the amount of state the windowing operator needs to maintain. Formally: Transformation Rules for PATH: We identify two transformation rules for PATH: (1) Alternation: P | ( , ) = ( , ) (2) Concatenation: P · ( , ) = 1, 2, 1 = 2 ( , ) These transformation rules enable the exploration of a rich plan space for SGQ that are represented by SGA. In particular, PATH and its transformation rules enable the integration of existing approaches for RPQ evaluation with standard optimization techniques such as join ordering and pushing down selection in a principled manner. Traditionally, path query evaluation follows two approaches [10, 23, 38, 57, 65, 73] : graph traversals guided by finite automata, or relational algebra extended with transitive closure, i.e., ℎ -RA. Yakovets et al. introduce a hybrid approach (Waveguide) and model the cost factors that impact the efficiency of RPQ evaluation on static graphs [73] . SGA enables the representation of these approaches in a uniform manner, and the above transformation rules enable the exploration of a rich plan space that subsumes these existing plans. An example application of these transformation rules and a micro-benchmark demonstrating the potential benefits of query optimizations through plan space exploration is given in Section 7.4. We implemented a prototype streaming graph query processor (https://dsg-uwaterloo.github.io/s-graffito/) based on the algebraic framework we propose in this paper. Our goal is to build a plausible conceptual framework for expressing and evaluating SGQ -not to have a complete system. Hence, we focus on the construction of query execution plans and physical implementations of logical SGA operators. Conceptually, SGQ can be evaluated by repeatedly evaluating from scratch the corresponding one-time query at each point in time (Section 4.1). Albeit semantically correct, such a re-execution strategy is, of course, infeasible. In streaming systems, the focus is on incremental evaluation of persistent queries where the goal is to avoid re-computing the entire result by only computing the changes to the output in real-time as new input arrives. Consequently, queries are executed in a data-driven (push-based) manner as opposed demand-driven (pull-based) query processing employed in traditional relational DBMSs [30] . As such, one can implement the framework proposed in this paper over existing streaming systems such as Apache Flink, Spark Streaming and Timely Dataflow. For our prototype, we use Timely Dataflow (TD) as the underlying execution engine. Applications in TD are expressed as a directed graph of operations where vertices correspond to user-defined computations and edges correspond to the flow of data between them. In executing an SGQ, the query processor first creates a logical plan from the canonical SGA expression of the given query. The physical execution plan in the form of a dataflow graph is constructed by: (i) creating source vertices for leaves of the logical query plan that receives input graph streams, (ii) replacing logical SGA operators with physical operator implementations, and (iii) creating a sink vertex for the root of the logical query plan that pushes results back to the user. Section 6.2 describes physical operator implementations in detail. TD associates each input data with a logical timestamp that enables fine-grained synchronization and progress tracking. We represent each input graph stream as an evolving collection where each item represents an sge (Definition 4) and event timestamps assigned by the source are used as logical timestamps. Upon the arrival of a new edge, TD propagates the corresponding sge through the physical execution plan and computes the new output at the given logical timestamp. 6.2.1 Overview. Physical operator implementations for streaming systems have two requirements: they should be push-based and non-blocking, so they do not need the entire input to be available before producing the first result. The standard dataflow implementations of stateless FILTER and UNION operators can be directly used in SGA, and WSCAN can be implemented via the standard map operator that adjusts the validity intervals of sgts based on window specifications. We focus on the stateful operators PATTERN and PATH that need to maintain an internal operator state that is accessed during query processing. This state is updated as new sgts enter the window and old sgts expire. As discussed earlier, timebased sliding windows ensure that the portion of the input that may contribute to any future result is finite, making incremental, non-blocking computation possible. TD's Differential Dataflow (DD) layer [51] provides a set of builtin, high-level programming primitives (operators) that can be used to compose arbitrary dataflows for general-purpose computations, and it automatically incrementalizes these. Consequently, DD can be asked to evaluate SGQ by (i) creating a dataflow of DD operators for a given SGQ and (ii) maintaining the window content as an evolving collection. Indeed, we use such a strategy as a competitive baseline in our experimental evaluation (Section 7.2.2). However, DD's generality comes at a performance cost for evaluating SGQ, as we show in Section 7.2.2. Below, we describe how to devise physical operator implementations specific to SGQ by utilizing the properties of the SGQ model. Of course, these are not the only physical operator implementations that are possible for SGA; other implementations can possibly be developed, and these are exemplars to demonstrate the implementability of the SGA operators. They are also what we use in the experiments. Matching. Implementation of PATTERN models subgraph patterns as conjunctive queries that can be evaluated using a series of joins. There is a rich literature of streaming join implementations that can be used. Symmetric hash join [71] is commonly used to implement non-blocking joins in the streaming model: a hash table is built for each input stream and upon arrival (expiration) of a tuple, it is inserted into (removed from) its corresponding hash table and other tables are probed for matches [28, 69] . This produces an append-only stream of results for internal windows that do not invalidate previously reported results upon expiration of their participating tuples [26] . For external windows that require eviction of old results as the windows slide forward, expired results can be determined by maintaining expiration timestamps: a join result expires when one of its participating tuples expire. Our use of validity intervals enables the user or the application to adopt both window semantics without the need for explicit processing of expired input tuples. Given a subgraph pattern, we take the standard approach of creating a binary join tree where leafs represent streaming graphs as input streams and internal nodes represent pipelined hash join operators. For instance, Figure 8 (right) shows the logical plan for the query in Example 1 and the join tree for its PATTERN. In our prototype, we use the ordering of predicates in PATTERN to construct the join tree and leave the problem of finding efficient join plans (e.g. using worst-case optimal joins [55] ) for future investigation. DD's iterate allows constructing cyclic dataflows that can model arbitrary nested iterations, and it can be used to evaluate PATH and its recursive path expressions. However, the use of recursion in SGQ is limited to transitive closure (Section 4), and the RPQ-based semantics of PATH is sufficient to evaluate this limited form of recursion (Theorem 1). In previous work [57] , we study the design space of streaming algorithms for persistent RPQ under arbitrary path semantics. In brief, the streaming RPQ algorithm follows the automata-based RPQ evaluation method [14] and maintains a spanning forest-based data structure, called Δ-tree index, that enables the compact representation of partial path segments. In our prototype, we employ the streaming RPQ algorithm in [57] as the non-blocking, physical implementation of PATH operator, which eliminates the need for cycles in physical execution plans. Additionally, a spanning-tree-based representation of intermediate results enables us to recover actual paths and allows queries to return and manipulate paths as first-class citizens. 6.2.4 Customized PATH Implementation. We now describe a novel algorithm Streaming Path Navigation (S-PATH) that can be used as an alternative physical operator for the PATH operator (Definition 20). In contrast to our previous work [57] , S-PATH utilizes the validity intervals of path segments to simplify the state maintenance in the absence of explicit deletions. Algorithms in [57] are based on the negative tuple approach; expirations due to window movements are processed using the same machinery as explicit deletions. Upon expiration (deletion) of an edge, their algorithm first finds all results that are affected by the expiration (deletion), then it traverses the snapshot graph to ensure that there is no alternative path leading to the same result. This corresponds to re-derivation step of DRed [32] , optimized for RPQ evaluation on streaming graphs. Instead, S-PATH utilizes the temporal pattern of sliding window movements and adopt the direct approach, i.e., it can directly determine expired tuples based on their validity intervals. This is possible due to the separation of the implementation of sliding windows from operator semantics via an explicit WSCAN operator. Algorithm S-PATH incrementally performs a traversal of the underlying snapshot graph under the constraints of a given RPQ as sgts arrive. It first constructs a DFA from the regular expression of a PATH operator, and initializes a spanning forest-based data structure, called Δ − PATH, that is used as the internal operator state during query processing. Δ − PATH is used to maintain a path segment, i.e., a partial result, between each pair of vertices in the form a spanning forest under the constraints of a given RPQ, consistent with Definition 20. Upon the arrival of an sgt, Algorithm S-PATH probes Δ − PATH to retrieve partial path segments that can be extended with the edge (or a path segment) of the incoming sgt. Each partial path segment is extended with the incoming sgt, and Algorithm S-PATH traverses the snapshot graph until no further expansion is possible. Definition 21 (Spanning Tree ). Given an automaton for the regular expression of a PATH operator P and a streaming graph at time , a spanning tree forms a compact representation of valid path segments that are reachable from the vertex ∈ under the constraints of a given RPQ, i.e., a vertex-state pair ( , ) is in at time if there exists a path ∈ from to with label ( ) such that = * ( 0 , ( )). A node ( , ) ∈ indicates that there is a path in the snapshot graph with label ( ) such that = * ( 0 , ( )), and this path can simply be constructed by following parent pointers (( , ). ) in . Under the arbitrary path semantics, there are potentially infinitely many path segments between a pair of vertices that conform to a given RPQ due to the presence of cycles in the snapshot graph and a Kleene star in the given RPQ. Among those, S-PATH materializes the path segment with the largest expiry timestamp, that is, the path segment that will expire furthest in the future. Consequently, for each node ( , ) ∈ , the sequence of vertices in the path from the root node to ( , ) corresponds to the path from to in the snapshot graph with the largest expiry timestamp. This is achieved by the coalesce primitive (Definition 11) with an aggregation function max over the expiry timestamp of path segments. 7 Upon expiration of a node ( , ) in and its corresponding path segment in the snapshot graph, this guarantees that there cannot be an alternative path segment between and that have not yet expired. Hence, we can directly find expired tuples based on their expiry timestamps. This is based on the observation that expirations have a temporal order unlike explicit deletions, and S-PATH utilizes these temporal patterns to simplify window maintenance. Definition 22 (Δ − PATH Index). Given an automaton for the regular expression of a PATH operator P and a streaming graph at time , Δ − PATH is a collection of spanning trees (Definition 21) where each tree is rooted at a vertex ∈ for which there is an sgt ∈ ( ) with a label such that ( 0 , ) ≠ ∅ and = . Δ − PATH encodes a single entry for each pair of vertices under the constraints of a given query, consistent with the set semantics of snapshot graphs (Section 3). Due to spanning-tree construction (Definition 21), actual paths can easily be recovered by following the parent pointers; hence, Δ − PATH constitutes a compact representation of intermediate results for path navigation queries over materialized path graphs. Our implementation models Δ − PATH as a hash-based inverted index from vertex-state pairs to spanning trees, enabling quick look-up to locate all spanning trees that contain a particular vertex-state pair. Upon arrival of an sgt = ( , , , [ , ) , D), Algorithm S-PATH probes this inverted index of Δ − PATH to retrieve all path segments that can be extended with the incoming sgt, that is, spanning trees that have the node ( , ) with an expiry timestamp smaller than for any state ∈ { ∈ | ( , ) ≠ ∅} (Line 14). If the target node ( , ) for = ( , ) is not in the spanning tree , Algorithm Expand is invoked to expand the existing path segment from ( , 0) to ( , ) with the node ( , ) and to create a new leaf node as a child of ( , ). In case there already exists a path segment between vertices ( , 0) and ( , ) in Δ − PATH, i.e., the target node ( , ) is already in , Algorithm S-PATH compares its expiry timestamp with the new candidate (Line 18). If the extension of the existing path segment from ( , 0) to ( , ) with ( , ) results in a larger expiry timestamp than ( , ). , Algorithm Propagate is invoked to update the expiry timestamp of ( , ) and its children in . Algorithms Expand and Propagate traverse the snapshot graph until no further update is possible. The following example illustrates the behaviour of Algorithm S-PATH on our running example. Example 9. Consider the real-time notification query in Example 1 whose SGA expression is given in Example 8. Figure 9a shows an excerpt of the streaming graph input to the PATH operator P + . Figure 9b depicts a spanning tree ∈ Δ−PATH at = 27. Upon arrival of the sgt ( , , , [28, 37) , D = {( , , )}) at = 28, Algorithm S-PATH extends the path segment from ( , 0) to ( , 1) with ( , 1), and compares its expiry timestamp with that of ( , 1) that is already in . As the new extension has larger expiry timestamp, the validity interval and the parent pointer of ( , 1) ∈ is updated (Line 18 in Algorithm S-PATH). Then, incoming sgts at times = 28 and = 29 are processed by Algorithm Expand as corresponding target nodes ( , 1) and ( , 1) are not in , adding ( , 1) and ( , 1) as children of ( , 1). At = 30, the incoming sgt ( , , , [30, 39) , D = {( , , )}) might extend the path segment from ( , 0) to ( , 1) with expiry timestamp 33. However, Algorithm S-PATH does not make any modification to Δ − PATH, as ( , 1) is already in with a larger timestamp (Line 18). Figure 9c depicts the resulting spanning tree at = 30. As described earlier, Δ − PATH stores a parent pointer for each node pointing to its parent node in the corresponding spanning tree, and Algorithm Propagate updates these pointers during processing. By traversing these parent pointers for each resulting sgt, Algorithm Expand can construct the actual path (Line 7) and return it as a part of the resulting sgt, i.e., it populates the implicit payload attribute D of the resulting sgt with the sequence of edges that forms the resulting path. The cost of this operation is O ( ) where is the length of the resulting path. Δ − PATH guarantees that the expiry timestamp of a node ( , ) in is equal to largest expiry timestamp of all paths between and in the snapshot graph with a label such that = * ( 0 , ). Consequently, for a node ( , ) ∈ with expiry timestamp smaller than , there cannot be another path from to with an equivalent label that is valid at time . Consider the spanning tree given in Figure 9c . Algorithm S-PATH can directly determine, without additional processing, that nodes ( , 1) and ( , 1) are expired as their expiry timestamp is 31. Thus, at any given time , Algorithm S-PATH can simply ignore a node ( , ) ∈ with expiry timestamps smaller than (Line 18) and such nodes can be removed from Δ − PATH. To prevent Δ − PATH from growing unboundedly due to expired tuples, a background process periodically purges expired tuples from Δ − PATH. The following example illustrates how the negative tuple and direct approaches differ, respectively for [57] and our proposed algorithm. Example 10. Consider the same real-time notification query as in Example 9. Both approaches behave similarly until = 28 as all vertex-state pairs in have a single derivation at = 27 ( Figure 9b ). Upon arrival of the sgt ( , , , [28, 37) , D = {( , , )}) at = 28, the negative tuple approach as in [57] does not update as ( , 1) is already in , whereas the direct approach as used in this paper updates the validity interval and the parent pointer of ( , 1) ∈ (Line 18 in Algorithm S-PATH). Then, incoming sgts at times = 28 and = 29 are processed similarly, adding ( , 1) and ( , 1) as children of ( , 1). Figures 9c and 9d depict the corresponding spanning trees at = 30 for the direct and the negative tuple approaches, respectively. Note that in Figure 9c , the validity intervals of nodes in the subtree rooted at node ( , 1) reflects the newly discovered path from to through in 30 . The negative tuple and the direct approach differs at = 31 as multiple nodes expire. The negative tuple used as in [57] marks the entire subtree of ( , 1) as potentially expired (Figure 9d ), and performs a traversal of the snapshot graph 31 to find alternative, valid paths for expired nodes. These traversals undo the effect of expired sgts via explicit deletions. Upon discovering alternative paths for nodes ( , 1), ( , 1) and ( , 1) that are valid at time = 31, they are re-inserted into . Instead, our proposed algorithm can directly [23, 31) x z [24, 32) z u [25, 35) x y [26, 33) y w [27, 40) z t [28, 37) y u [29, 41) u v [30, 38) u s [30, 39) w (y, 1) [25, 35) (w, 1) [26, 33) (z, 1) [23, 31) (u, 1) [24, 31) (t, 1) [27, 31) (b) = 27 (x, 0) (y, 1) [25, 35) (w, 1) [26, 33) (u, 1) [28, 35) (v, 1) [29, 35) (s, 1) [30, 35) (z, 1) [23, 31) (t, 1) [27, 31) (c) = 30 (x, 0) (y, 1) [25, 35) (w, 1) [26, 33) (z, 1) [23, 31) (u, 1) [24, 31) (v, 1) [29, 31) (s, 1) [30, 31) (t, 1) [27, 31) (d) = 30 for [57] Figure 9 : (a) A streaming graph as the input for PATH operator, (b) spanning tree at = 28, (c) spanning tree at = 30 of the proposed algorithm following the direct approach, and (d) spanning tree at = 30 of [57] following the negative tuple approach. determine the expired nodes based on the validity intervals (nodes ( , 1) and ( , 1) as shown in Figure 9c ) without additional processing. and PATH rely on the direct approach that utilizes the temporal pattern of sliding windows for state maintenance; the expiration timestamps are used to directly locate expired tuples. On appendonly streaming graphs, existing sgts only expire due to window movements. Albeit rare, certain applications might require explicit deletions of previously inserted sgts, which necessitates the use of the negative tuple approach to handle such explicit deletions. 8 Deleted sgt, with an additional flag to denote deletion -i.e., a negative tuple, is used to undo the effect of the original sgt on the operator state and to invalidate previously reported results, if necessary. For pipelined hash join, which is used in the implementation of PATTERN, processing of negative tuples is the same as original input tuples: a negative tuple is removed from its corresponding hash table and other tables are probed to find corresponding deleted results. The use of negative tuples for explicit deletions in the context of RPQ evaluation is first proposed by Pacaci et al [57] . Here, we describe how to adopt the negative tuple approach for explicit deletions of sgts in Algorithm S-PATH. In brief, upon explicit deletion of an sgt, we first identify tree-edges that disconnect spanning trees in Δ − PATH. For each such tree-edge, we first mark the nodes in the subtree that are disconnected due to the explicit deletion. Then, for each node in this subtree, we use a Dijkstra-based traversal over the snapshot graph to find an alternative path with the largest expiry timestamp. Dijkstra's algorithm guarantees that we can efficiently find the path with the largest expiry timestamp for each marked node, consistent with Definition 22. A marked node is removed from the spanning tree only if there is no alternative valid path. Deletion of a non-tree edge does not require any modification as it leaves spanning trees unchanged. 8 The negative tuple approach can also be used to signal expirations through an explicit deletion of the corresponding sgt to undo its effect [25, 29, 57] . Indeed, the negative-tuple approach can be used for incremental evaluation of an arbitrary computation, whereas direct approach is applicable to negation-free queries over append-only streaming graphs [29] . Our objective is to demonstrate the feasibility of implementing a performant system that incorporates the algebraic framework we propose in this paper. Using the prototype implementation described in §6, we first provide an end-to-end performance analysis of our algebraic approach for persistent evaluation of streaming graph queries ( §7.2). Then, we assess the scalability by varying the window size T and the slide interval ( §7.3). Finally, we highlight the benefits of the proposed SGA in exploring the rich plan space through transformation rules and demonstrate the potential performance improvements of exploring this plan space ( §7.4). 7.1.1 Setup. Experiments are run on a Linux server with 32 physical cores and 256GB memory. For each query and configuration, we report the tail latency of each window slide, i.e., the total time to process all arriving and expired sgts upon window movement and to produce new results, and the average throughput after ten minutes of processing on warm caches. We use Stackoverflow (SO) and LDBC SNB (SNB) graphs for our experimental analysis; these are publicly available, large-scale graphs with labelled and timestamped edges on which persistent queries with complex graph patterns can be formulated. SO is a temporal graph of user interactions on the stackoverflow website containing 63M interactions (edges) of 2.2M users (vertices), spanning 8 years [59] , and SNB is a synthetic social network graph that simulates the interactions of an online social network [22] . We extract the update stream of the LDBC workload that contains 8 different types of interactions, and we use replyOf, hasCreator and likes edges between users and posts, and knows edges between users. We use a scale factor of 10 with 7.2M users and posts (vertices) and 40M user interactions (edges). SO contains only a single type of vertex and 3 different edge labels, and its cyclic nature causes a high number of intermediate results and resulting paths; so it is the most challenging one for the proposed algorithms. We set the Table 1 : 1 − 4 correspond to common RPQ observed in realworld query logs [15] , and 5 − 7 are Datalog encodings of RQ-based complex graph patterns that we use to define streaming graph queries. 5 and 6 correspond to complex graph patterns of LDBC SNB queries 7 and 7 [22] , respectively, and 7 corresponds to the complex graph pattern given in Example 1 that is defined as a recursive path query over the graph pattern of 6 . , and correspond to edge predicates that are instantiated based on the dataset characteristics. window size T to 1 month and the slide interval to 1 day unless specified otherwise. To the best of our knowledge, no current benchmark exists featuring RQ for graph DBMSs. The existing benchmarks are limited to UCRPQ, thus not capturing the full expressivity of RQ even for static graphs. Streaming RDF benchmarks such as LSBench (https://code.google.com/archive/p/lsbench/) and Stream WatDiv [24] only focus on SPARQL v1.0 (thus not even including simple RPQs), and their workloads do not contain any recursive queries. Hence, we formulate a set SGQ from existing UCRPQ-based workloads as follows: we collect a set of graph patterns in the form of UCRPQ from existing benchmarks and studies [11, 15, 22, 57, 73] , and we compose a set of complex graph patterns from those by applying a Kleene star over each graph pattern. Table 1 lists the set of graph patterns of increasing expressivity (from RPQ to complex RQ with complex graph patterns) that we use to define streaming graph queries. 1 − 4 are commonly used RPQs in existing studies [15, 57, 73] , and we use those to test our PATH operator. 5 & 6 are CRPQ-based complex graph patterns based on SNB queries IS7 and IC7 [22] . For instance, 6 -IC7 of SNB -with edge labels knows, likes and hasCreator asks for recent likers of a person's messages that are also connected by a path of friends. 7 -Ex. 1is the most expressive RQ-based complex graph patterns we use to demonstrate the abilities of the proposed SGA to unify subgraph pattern and path navigation queries in a structured manner and to treat paths as first-class citizens. It defines a path query over the complex graph pattern of 6 ; it finds arbitrary length paths where users are connected by the recentLiker pattern. Note that this query cannot be expressed in existing graph query languages such as Cypher and SPARQL. Finally, for each dataset, we instantiate the query workload from these graph patterns by choosing appropriate predicates, i.e., edge labels, for each query edge and by setting the duration of time-based sliding windows W T as described above. Table 2 (SGA) shows the aggregated throughput and tail latency of our streaming graph query processor for all queries in Table 1 . We discard each streaming graph edge whose label is not in a given SGQ. Tail latencies reflect the 99 ℎ percentile latency of processing a window slide and produce the corresponding resulting sgts. Across queries, the performance is lower for SO graph because it is dense and cyclic. The throughput ranges from hundreds of edges-per-second for the SO to hundreds of thousands of edges-per-second for SNB. Existing work on query processing over streaming data such as data stream management systems and streaming RDF systems cannot process queries in Table 1 as they focus on relational queries and SPARQL v1.0, respectively ( §1). To the best our knowledge, TD with its DD layer is the only general-purpose system that can be used to incrementally evaluate recursive computations that are modelled as cyclic dataflows. Table 2 (DD) reports the throughput and tail latency of DD dataflows for all queries in Table 1 . Overall, our SGA-based query processor outperforms the DD baseline on SO and provides a competitive performance on the SNB dataset. On SNB, 6 & 7 do not have the Kleene-plus over as it causes DD to timeout. Due to highly cyclic structure of SO, there are many alternative paths between each pair of vertices, and our streaming RPQ algorithm for PATH implementation maintains a compact representation of valid path segments and utilizes the temporal patterns of sliding window movements to simplify expirations ( §6.2). DD-based query processor provides better performance on linear path queries 1 -4 on SNB, but not others. This is due to the tree-shaped structure of replyOf edges in SNB, where there is only one path between a pair of vertices, so PATH specific optimizations do not apply. Performance variations on SNB suggest optimization opportunities for recursive graph queries when selecting physical operator implementations, as in the case for streaming relational joins [29] . These results demonstrate the feasibility of our algebraic approach for evaluating SGQ and our physical operator implementations. In this section, we analyze the impact of the window size T and the slide interval on end-to-end query performance of the proposed streaming graph query processor. We use SO graph for this experiment as its dense, cyclic structure stresses our operator implementations. Fig. 10a reports the aggregate throughput and the tail latency for each query across various window sizes. As expected, the throughput of all tested queries decreases with increasing T , as a larger window size increases the # of sgts in each window. Similarly, the tail latency of each window slide increases with the increasing window size. We also assess the impact of the slide interval on performance. As previously mentioned, the slide interval controls the timegranularity at which the sliding window progresses, and our prototype implementation uses to control the input batch size. Figure 10b shows that the aggregate throughput and the tail latency for each query remain stable across varying slide intervals. This is due to tuple-oriented implementation of physical operators of SGA; SGA operators are designed to process each incoming tuple eagerly in favour of minimizing tuple-processing latency, and they do not utilize batching to improve throughput with larger batch sizes. Consequently, the tail latency of window movements increases with increasing slide interval. This is in contrast to DD whose throughput increases with increasing as shown in Figure 11 . DD and its underlying indexing mechanism, i.e., shared arrangements [50] , are designed to utilize batching and improve throughput with increasing batching size: all sgts that arrive within one interval are batched together with a single logical timestamp (epoch) and DD operators can explore the latency vs throughput trade-off by changing the granularity of each epoch. The investigation of batching within SGA operators and the identification of other optimization opportunities is a topic of future work. SGA proposed in this paper enables a rich foundation for logical SGQ optimization through query rewriting as previously discussed ( §5.4). In this paper, we do not address the full scope of optimization issues (full optimizer development is the topic of ongoing research), but we design a micro-benchmark to highlight the possibilities provided by SGA. In particular, we choose 4 ( Table 1 ) as its linear pattern combined with a Kleene plus demonstrates the potential benefits of SGA transformation rules involving SGA's novel PATH operator. Fig. 12 demonstrates the throughput and the tail latency of different plans obtained from following equivalent SGA expressions for 4 : • SGA: P + ( 1 , 3 , 1 = 2 ∧ 2 = 3 ( , , )) • P1: P ( · · ) + ( , , ) • P2: P ( · ) + ( , 1 , 2 , 1 = 2 ( , )) • P3: P ( · ) + ( , 1 , 2 , 1 = 2 ( , )) The first expression, SGA, is the canonical SGA expression for 4 that is generated by the Algorithm SGQParser. Such plans are called loop-caching in literature as they enable re-use of the intermediate results for the base pattern ( · · ) [73] 9 . P1, P2 and P3 are obtained from the canonical SGA expression using the transformation rules given in §5.4, and represent novel plans that are possible due to novel PATH operator. Fig. 12 clearly illustrates the potential benefits of exploring the rich plan space offered by SGA: some of the newly computed plans provide up to 60% increase in throughput and 60% reduction in the latency. We observe a similar behaviour on other path queries 2 and 3 (up to 50% difference in throughput). These results suggest further optimization opportunities for logical query optimization as query rewrites that are generated by SGA 9 It is also the only plan available to DD baseline that performs a fixed-point iteration over the base pattern ( · · ). transformation rules can provide significant performance benefits for evaluating SGQ over streaming graphs. Our prototype implementation employs the automata-based streaming RPQ algorithm of [57] as the physical implementation of the PATH operator. Section 6.2.4 describes an alternative physical implementation (Streaming Path Navigation -S-PATH) for the novel PATH operator of our proposed SGA based on the direct approach. Table 3 compares its impact on the performance of our proposed streaming graph query processor. The performance impact of adopting the Algorithm S-PATH for PATH varies depending on the workload characteristics: it provides improvements in throughput for most queries on the SO graph, whereas the performance differences on the SNB graph is small. This can be attributed to the cyclic nature of the SO graph. A higher number of intermediate path segments increases the size of the internal operator state that the physical implementations of the PATH operator need to maintain. Overall, these results illustrate the potential benefits of exploring alternative operator implementations. In particular, cost-based optimization techniques for determining suitable operator implementations based on data and query characteristics might provide significant performance benefits for evaluating SGQ over streaming graphs. This paper introduces a general-purpose query processing framework for streaming graphs that consists of (i) streaming graph query model and algebra with well-founded semantics, and (ii) a prototype streaming graph query processor as an embodiment of the proposed framework. The SGQ model addresses the requirements we set forth for querying streaming graphs by capturing both common features of existing graph query languages and novel features that are beyond the power of existing languages (such as the treatment of paths as first-class citizens, powerful path patterns and composability). SGA and its transformation rules provide the foundational framework to express and evaluate streaming graph queries. Our prototype streaming graph query processor exemplifies physical operator implementations specific to streaming graph queries due to this framework. Experimental analyses on real-world and synthetic streaming graphs demonstrate the feasibility and the potential performance gains of our approach. SGQ and SGA establish the basis of systematic study of query processing issues over streaming graphs. We are now engaged in research along two dimensions: (i) designing an SGA-based query optimizer for the systematic exploration of the rich plan space using SGA's transformation rules, and (ii) incorporating attribute-based predicates to fully support the property graph model. Additional work that would enrich the implementation include the development of alternative physical operators and additional transformation rules for plan space enumeration. Given that the SGQ-SGA framework is complete with a clear semantics (as captured in RQ model), and given that SGQ incorporates the capabilities of existing and emerging query languages, future work can focus on efficient implementation. The Design of the Borealis Stream Processing Engine Aurora: a new model and architecture for data stream management Foundations of databases Zooming Out on an Evolving Graph Distributed Evaluation of Subgraph Queries Using Worst-case Optimal and Low-Memory Dataflows G-CORE: A core for future graph query languages Foundations of modern query languages for graph databases The CQL Continuous Query Language: Semantic Foundations and Query Execution Models and Issues in Data Stream Systems Querying graph databases gMark: schema-driven generation of graphs and queries Emanuele Della Valle, and Michael Grossniklaus. 2009. C-SPARQL: SPARQL for continuous querying Graph queries: From theory to practice Hannes Voigt, and Nikolay Yakovets Navigating the Maze of Wikidata Query Logs Enabling ontology-based access to streaming data sources Apache Flink™: Stream and Batch Processing in a Single Engine A Selectivity based approach to Continuous Pattern Detection in Streaming Graphs A consensus glossary of temporal database concepts Towards a unified language for RDF stream query processing Stinger: High performance data structure for streaming graphs The LDBC Social Network Benchmark: Interactive Workload Incremental graph computations: Doable and undoable Stream WatDiv -A Streaming RDF Benchmark Incremental evaluation of sliding-window queries over data streams Sliding Window Query Processing over Data Streams Issues in data stream management Processing Sliding Window Multi-Joins in Continuous Queries over Data Streams Update-Pattern-Aware Modeling and Processing of Continuous Queries Query Evaluation Techniques for Large Databases RecService: Multi-Tenant Distributed Real-Time Graph Processing at Twitter Maintaining Views Incrementally Stream Processing Languages in the Big Data Era Timeevolving graph processing at scale TurboFlux: A Fast Continuous Subgraph Matching System for Streaming Graph Data DBToaster: Higher-Order Delta Processing for Dynamic, Frequently Fresh Views Sparkwave: continuous schema-enhanced pattern matching over RDF data streams Regular path queries on large graphs Semantics and implementation of continuous sliding window queries over data streams Twitter Heron: Stream Processing at Scale GraphOne: A data store for real-time analytics on evolving graphs GraphOne: A Data Store for Real-time Analytics on Evolving Graphs A native and adaptive approach for unified processing of linked streams and linked data Time constrained continuous subgraph search over streaming graphs TriAL: A navigational algebra for RDF triplestores Regular path queries on graphs with data Encyclopedia of Database Systems Recursive Computation of Regions and Connectivity in Networks on Data Engineering GraphBolt: Dependency-driven synchronous processing of streaming graphs Shared Arrangements: Practical Inter-Query Sharing for Streaming Dataflows Differential Dataflow Finding regular simple paths in graph databases Temporal graph algebra Naiad: a timely dataflow system Skew strikes back Incremental view maintenance with triple lock factorization benefits Regular Path Query Evaluation on Streaming Graphs Do We Need Specialized Graph Databases?: Benchmarking Real-Time Social Networking Applications Motifs in temporal networks Real-time constrained cycle detection in large dynamic graphs Regular queries on graph databases The Ubiquity of Large Graphs and Surprising Challenges of Graph Processing Graphsurge: Graph Analytics on View Collections Using Differential Computation Nikolay Yakovets, Da Yan, and Eiko Yoneki. 2020. The Future is Big Graphs! A Community View on Graph Processing Systems Graph query processing. Encyclopedia of Big Data Technologies Graphin: An online high performance incremental graph processing framework GraPU: Accelerate streaming graph analysis through preprocessing buffered updates Storm@ twitter XJoin: A Reactively-Scheduled Pipelined Join Operator PGQL: a property graph query language Dataflow query execution in a parallel main-memory environment Query languages for graph databases Query planning for evaluating SPARQL property paths ViewDF: Declarative Incremental View Maintenance for Streaming Data