Mass transfer in polymers in a supercritical CO2-atmosphere

Mass transfer in polymers in a supercritical CO2-atmosphere

J Supercomput DOI 10.1007/s11227-016-1919-0 Multi-query processing of XML data streams on multicore Soo-Hyung Kim1 · Kyong-Ha Lee2 Yoon-Joon Lee1 · ...

2MB Sizes 3 Downloads 24 Views

J Supercomput DOI 10.1007/s11227-016-1919-0

Multi-query processing of XML data streams on multicore Soo-Hyung Kim1 · Kyong-Ha Lee2 Yoon-Joon Lee1

·

© Springer Science+Business Media New York 2016

Abstract The multicore architecture has been the norm for all computing systems in recent years as it provides the CPU-level support of parallelism. However, existing algorithms for processing XML streams do not fully take advantage of the facility since they have not been devised to run in parallel. In this article, we propose several methods to parallelize the finite state automata (FSA)-based XML stream processing technique efficiently. We transform a large collection of XPath expressions into multiple FSA-based query indexes and then process XML streams in parallel by virtue of the index-level parallelism. Each core works only with its own query index so that no synchronization issue occurs while filtering XML streams with multiple path patterns given by users. We also present an in-memory MapReduce model that enables to process a large collection of twig pattern joins over XML streams simultaneously. Twig pattern joins in our approach are performed by multiple H/W threads in a shared and balanced way. Extensive experiments show that our algorithm outperforms conventional algorithms with an 8-core CPU by up to ten times for processing 10 million XPath expressions over XML streams.

B

Kyong-Ha Lee [email protected] Soo-Hyung Kim [email protected] Yoon-Joon Lee [email protected]

1

School of Computing, KAIST, 291 Daehak-ro, Yuseong-gu, Daejeon 34141, Korea

2

Division of Convergence Technology Research, KISTI, 245 Daehak-ro, Yuseong-gu, Daejeon 34141, Korea

123

S.-H. Kim et al.

Keywords Data streams · XML · Query processing · Parallel processing · Multicore architecture

1 Introduction XML has become one of the most popular data formats for data representation and transmission on the Internet [5]. Many data in various applications have been typed in XML by virtue of its simplicity and extensibility. XML data are dynamically generated in general. The typical examples include ticket makers, network traffic data, web services, and log streams. Such XML data, known as XML streams, typically require real-time data processing, which means that the XML data are required to process as soon as they are delivered. In this respect, it has been a great challenge to process XML streams in a timely manner. Therefore, processing queries over such continuous, unbounded, and sequentially accessed XML streams has been gaining significant attention in the recent decade [2,11,14,20,32,33,42,44]. Specifically, XML stream processing is to find structural matchings from a series of small-sized XML documents with a given set of XPath expressions. In XML stream applications, the results of XML stream processing are structural matchings, i.e., matching elements from a series of XML documents. Meanwhile, a multicore CPU, also known as chip-level multiprocessor (CMP), has been rapidly adopted in various computing systems as a more cost-effective architecture, delivering more computing facilities like chip-level H/W thread support and less power consumption [21]. However, conventional XML stream processing algorithms do not instantly benefit from the features of the multicore since they have not been initially considered to work in parallel. Most of conventional algorithms are serial and only a few of studies on parallel XML stream processing have been reported in the literature. However, the recent studies on parallel XML stream processing also failed to address the issue of handling a massive set of queries over XML streams. They only considered processing a single XPath expression [15,23,30,33,39] or processing multiple simple path patterns rather than complex twig patterns [44]. To shed light on the issues, we propose a set of algorithms called Distributed query Index for XML stream querY processing on multicore (DIXY). Our algorithms process XPath expressions over XML streams in parallel. Moreover, our algorithms are able to process many XPath expressions simultaneously in a shared and balanced way. To achieve this, we devise a new partitioning scheme that partitions an FSA-based query index into multiple indexes at runtime, rather than following conventional data or query partitioning schemes. In our approach, each core performs path pattern filtering with its own query index so as to guarantee workload balance across cores. Twig pattern joins, which join path pattern solutions to find relevant twig pattern matchings from XML streams, are also performed in parallel. This is achieved by implementing the join operations with an in-memory MapReduce programming model. To the best of our knowledge, this is the first study on processing a massive set of XPath expressions over XML streams in parallel on a multicore CPU. The main contributions of this article are summarized as follows:

123

Multi-query processing of XML data streams on multicore

Index-level partitioning scheme In our approach, an NFA-style query index for all XPath expressions given by users is transformed to multiple DFA-style query indexes as many as the number of cores in a system. As such, each core works only with its own query index and any communications between cores are not required for state transitions so that any synchronization issue does not occur while filtering XML streams. This scheme also helps naturally balance workloads across cores since every core performs only one state transition for each incoming element. Sharing input scans and path solutions It is wasteful to process a single XPath expression at a time in XML stream processing when a massive set of XPath expressions are given by users. Actually, many XPath expressions in the query set share common linear path patterns with each other in practice. In our approach, we share input scans and path solutions so as to reduce redundant processing of path patterns and save many computations and I/Os. Path pattern solutions are also shared by multiple twig pattern join operations in our approach. While joining path solutions for finding twig patterns, a group of join operations assigned to a reducer share the path solutions with each other. Multiple twig pattern joining in parallel In our approach, multiple twig pattern join operations are distributed across reducers and simultaneously executed as many as the number of H/W threads. We also implement a twig pattern join operation with a holistic twig pattern join algorithm, which is proven to be optimal for a certain subset of XPath expressions, to improve both computational and I/O efficiency. Runtime workload balancing and multi-query optimization In parallel processing, a straggling task lags the overall job execution. The native runtime scheduling scheme in the MapReduce programming model often does not work well when input data are severely skewed [26,27]. To address the issue, we rather exploit a dynamic shuffling scheme that balances workload across reducers at runtime. We group similar twig pattern join operations as many as the number of reducers such that the sum of the costs of join operations in each group is equal or similar to ones of other groups. We then assign each group into each reducers so as to perform multiple joins at once in each reducer. To achieve this, we first estimate the cost of each twig pattern join operation before actual joining. This cost estimation can be simply done as the worst-case I/O and CPU time complexity of the holistic twig join algorithm is linear in the sum of sizes of input path solutions [6,10]. The sizes of path solutions are naturally counted after finishing a path pattern filtering step. While performing the cost estimation, we also consider the sizes of path solutions shared by multiple twig pattern queries. The more common path patterns twig patterns have, the more chance to be grouped the join operations have. We then assign twig pattern joins into reducers at runtime so that every reducer approximately has the same cost of join operations. Experimental evaluation For performance evaluation, extensive experiments were conducted with two different datasets, i.e., XMark and Treebank. We also compared our algorithms with various conventional systems. We further evaluated the scalability and the elapsed time of DIXY by varying the number of cores, the num-

123

S.-H. Kim et al.

ber of queries, query types, probability of sub-operators/predicates, and selectivity. In addition, we measured the memory usage of DIXY. The remainder of this article is organized as follows. Section 2 describes the preliminary knowledge that helps understand our approach. We describe our approaches in Sect. 3. A quantitative analysis of our approach is proposed in Sect. 4. Section 5 presents the results of our extensive experiments. Related work is presented in Sect. 6. Finally, we conclude this article in Sect. 7.

2 Preliminaries 2.1 XML streams and XPath query processing An XML stream is modeled as an unbounded series of XML documents which come in real time. A single XML document is a rooted, ordered, and labeled tree where each node corresponds to an element or a value and edges represent either of element– element or element–value relationships between two nodes. The total order on the nodes in an XML tree is obtained by a preorder traversal of the tree nodes. Figure 1 presents an example of an XML document and its tree structure. Note that in the tree structure, each element is labeled in interval-based numbering scheme, e.g., (2, 13, 2) for element B. Interval-based numbering scheme, also known as region numbering scheme, helps promptly identify a relationship between any two nodes in an XML tree without tree traversal. The labels are ternary tuples, i.e., (star t, end, level) [12]. For any two XML tree nodes u and v, u is an ancestor of v if and only if u.star t < v.star t and u.end > v.end. A node u is a parent of a node v if and only if u is an ancestor of v and v.level = u.level + 1. For example, element A is a parent of element B since A’s star t < B’s star t, A’s end > B’s end and A’s level + 1 = B’s level. Also, a node u precedes a node v in document order if and only if u.star t < v.star t. v1 v2 v3 v4 v5 v6

A (1,24,1) B (2,13,2) C (3,8,3)

v1

v2

(14,23,2)

E (9,12,3) C (15,16,3)

D (4,5,4) E(6,7,4) D (10,11,4)

Fig. 1 Sample XML document and its tree structure

123

B

v3

v4

F (17,22,3)

C (18,19,4)

G (20,21,4)

v5

v6

Multi-query processing of XML data streams on multicore

Query 1 : /A/B[C]/F

A

Query 2 : //B[C/D]/E

B

Query 3 : //A[*/C/E]//G

C

B C F

A E

*

A B

G

D

C

C

Query 2

E Query 3

* Query 4

F

Query 4 : /A/B[//C]/F/* Query 1

Q1:P1 : /A/B Q1:P2 : /A/B/C Q1:P3 : /A/B/F Q2:P4 : //B Q2:P5 : //B/C/D Q2:P6 : //B/E Q3:P7 : //A Q3:P8 : //A/*/C/E Q3:P9 : //A//G Q4:P10 : /A/B Q4:P11 : /A/B//C Q4:P12 : /A/B/F/*

Fig. 2 XPath expression samples, twig patterns, and their decomposed path patterns

Querying XML documents identifies elements that their values and structures match a given query. XPath(XML Path Language) is a basic query language for XML data to find matches in an XML document [7]. A single XPath expression is commonly modeled as a twig pattern whose nodes are connected by either parent–child(/) or ancestor–descendant(//) axes. A predicate, denoted by ‘[ ]’, is applied to check conditions, either of a value or a substructure. Wildcards are also used to accept any element. Most of existing approaches for twig pattern matching start from decomposing a twig pattern into several linear path patterns (see Fig. 2). Once all the instances matched with the linear path patterns are found, they are joined together to produce instances matched with the twig pattern. Numerous algorithms for twig pattern query processing have been reported in the literature, and some of them have been proven to be optimal for a certain class of twig patterns. Readers are referred to a survey for various XML query processing techniques [17]. In this article, we use a simplified version of XPath query language version 1.0 defined as follows: QUERY ::= ( LOCATION_STEP )+ LOCATION_STEP ::= AXIS NODE ( [ PREDICATE ] )∗ NODE ::= element_name | * PREDICATE ::= NODE ( AXIS NODE )∗ | element_ name OP value AXIS ::= / | // OP ::= > | < | = | ≤ | ≥ 2.2 Automata-based XML stream processing Automata-based XML stream processing algorithms are known to be efficient for processing a large set of structural queries [42]. In the algorithms, a query index is built in a form of finite state automata before runtime and then nodes in the XML streams are evaluated by using state transitions, triggered by either of start or end element events. To build a query index, XPath expressions are first decomposed into linear path patterns as shown in Fig. 2. For example, an XPath expression /A/B[C]/F is decomposed into three linear path patterns /A/B, /A/B/C, and /A/B/F.

123

S.-H. Kim et al. Path ID List State ID {p2} B

1

{p1,p10} C 3 2

F

{p3} 4 * 16

A {p4}

0 B

*

2

6

C

7

* C D

{p7} 10

*

11 * 14

C G

Value

(0,A)

{1}

(1,B)

{2,16}

p2

4

p3

6

p4

(4,*)

{18}

17 {p11}

8

p5

(16,*)

{16}

9

p6

(16,C)

{17}

8 {p5}

10

p7

(5,A)

{10,14}

13

p8

9 {p6} A

p1, p10

Hash Table Key

3 18 {p12}

E

5

Path IDs

E

13 {p8} 15 {p9} 12

(2,C)

{3}

(2,F)

{4}

(5,B)

{6}

(5,*)

{5}

15

p9

(6,C)

{7}

17

p11

(6,E)

{9}

18

p12

(7,D)

{8}

(10,*)

{11}

(11,C)

{12}

(12,E)

{13}

(14,*)

{14}

(14,G)

{15}

Fig. 3 Example of an NFA-style query index

A nondeterministic finite automata (NFA)-style query index that shares common prefixes is presented in the literature [14,42]. In the approach, a single query index A is defined by a quintuple (Q, Σ, Q 0 , δ, F), where Q is a finite set of states, Σ is a finite set of input XML elements, Q 0 is a starting state, δ is a transition function: Q × Σ → Q, F is a set of accepting states, and a subset of Q that correspond to identifiers of given queries, respectively. Figure 3 presents an example of the NFAstyle query index built with a set of XPath expressions shown in Fig. 2. When a start element event comes, states in the query index are transited to the next states with respect to δ and the snapshot of the current states are stored in a runtime stack. This is simply done by pushing the identifiers of the states into the stack. Note that δ is typically implemented with a single hash table for guaranteeing constant lookup time as shown in Fig. 3. When an end element event comes, backtracking is performed by popping the runtime stack. The benefit of this approach is the ease of inserting and deleting queries. On the other hand, this approach suffers from the exponentially increasing number of state transitions if queries are deep and have many //-axes. To address this limitation, a deterministic finite automata (DFA)-style query index has been also reported in the literature. Since we can track only a single state transition for each element event in the DFA-style query index, the computational complexity of path pattern filtering is guaranteed to be O(1). However, building a full version of DFA from an NFA with n states requires powerset computation whose time complexity is O(2n ). XMLTK avoids the powerset computation by lazily building a DFA at runtime with a given set of queries over incoming XML streams [18]. Figure 4 presents a partial snapshot of the DFA-style query index in XMLTK, translated from the NFA in Fig. 3. It is noteworthy that in the DFA-style query index the sizes of query ID lists, each of which is stored in a bucket in another hash table, corresponding to a state rather sharply increase while the number of runtime active states/transtions decreased to 1. Since each state in the DFA is built by combining corresponding NFA states, a single

123

Multi-query processing of XML data streams on multicore A 5,10,14,16 … B A B 5,6,14,16 … 2,5,6,11,14,16 C B C B C 3,5,7,12,14,16,17 A E A 1,5,10,14 … C 5,11,12,14 5,10,11,14 C … B A 5,6,11,14 G [others] G … 5,11,14,15 [others] [others] B 0,5 B … 5,6,14 5,11,14,15 C A … 5,12,14 G 5,11,14 [others] B A … 5 … 5,14,15 B … [others] G B … A 5,6 5,10,14 A A E 5,9 A B C B A D 5,7 5,8 …

5,11,14

[others]

A



Fig. 4 Building a DFA-style query index by merging states in an NFA-style query index shown in Fig. 3

DFA state can have many redundant query IDs associated with accepting states in the corresponding NFA. The query IDs are kept in a hash table for constant-time lookups; however, query IDs in a single bucket still need to be sequentially scanned. Therefore, sequential scans of the query IDs can severely degrade the overall performance if the size of a given query set is large (see Fig. 7). 2.3 Parallelization strategies for XML query processing Some parallelization strategies for XML query processing have been reported in the literature [3,4]. These strategies are threefold: data partitioning, query partitioning, and hybrid partitioning. In the data partitioning strategy, each thread processes a certain portion of XML data with the same query set. In the strategy, we further have two choices in how to partition XML data. The first way is to partition each XML document into multiple chunks, as shown in Fig. 5a. The problem of this strategy in XML stream processing is that XML streams are unbounded so that it is hard to partition an XML stream1.xml

stream1.xml

A

v1 v2 …..

B C D

B C

E E

D

F C

Query set

Query set

(a)

G

stream1.xml

stream2.xml
v1 v2 v2 …..

Query set

Query set

(b)

A B C D

B C

E E

D

F C

Query subset1 {q1, q2, q3, q4, q5}

G

Query subset2 {q6, q7, q8, q9, q10}

(c)

Fig. 5 Parallelization strategies for XML query processing. a Element-level partitioning. b Document-level partitioing. c Query-level partitioning

123

S.-H. Kim et al.

document until we meet the end of an XML document. Moreover, when an XML tree is partitioned, the tree structure of an XML chunk can be lost or overlapped with other XML chunks in this strategy. Another problem is imbalanced selectivities. The selectivity of each element in the chunks is diverse so that workload balance across cores tends to be vulnerable. Another choice is to let each core work with each document at a time (see Fig. 5b). However, both the sizes of documents and the selectivities of elements inside the documents are various so that we are asked to carefully schedule multiple threads to fairly distribute workloads across cores. On the other hand, in a query-level partitioning strategy, a given query set is partitioned into multiple subsets and then each core processes a given XML document with one of the subsets. However, workloads across cores may not also be well balanced in this strategy, since each query subset constitutes a different set of query patterns and it produces a set of query indexes with various sizes and shapes. Hybrid partitioning strategy is to partition both a query set and an XML document. This strategy also requires how to partition both of them for guaranteeing workload balance across cores.

3 Our approach 3.1 Architecture overview In this section, we provide an overview of our system architecture and then describe how it processes a large collection of XPath expressions over XML streams in detail. Figure 6 illustrates the overall architecture of DIXY described in this article. First, users’ XPath expressions are delivered to the system before processing XML streams. The XPath expressions are then decomposed into linear path patterns as shown in Fig. 2 and redundant path patterns are merged into a single path pattern. Afterwards, an NFA-style query index is built from the set of linear path patterns and then stored in shared memory. This NFA-style query index is examined by H/W threads in order to build multiple DFA-style query indexes at runtime. When an XML stream flows

Path filtering XML Stream

User Query

SAX Parser

Query Decomposition

Linear Paths

Parsed Events

Path Filtering

Path Solutions

Joining twig patterns

DFA Matching Query Table Index Individual Memory Index Builder

NFA Query Index NFA Query Index

Mappers

Tagging Reducer ID Tagging Reducer ID

Path Filtering DFA Matching Query Table Index Individual Memory

Path Solutions

Shared Memory

Fig. 6 Architecture overview

123

Sort

Reducers Merge

(ReducerId, Path Solutions)

(ReducerId, Path Solutions)

Holistic Twig Join

Final Answers

Holistic Twig Join

Final Answers

Multi-query processing of XML data streams on multicore

in, each XML document is parsed and its events, i.e., startElement end endElement events, are transferred to multiple path pattern filtering engines, which are run by multiple H/W threads. Each path pattern filtering engine picks out XML elements which are matched with the linear path patterns. This path pattern filtering procedure on each core works similar to XMLTK, i.e., it lazily builds a DFA-style query index from an NFA-style query index built with a given set of XPath expressions at runtime. When a startElement event comes, each thread checks whether the state transition δ that corresponds to the element event exists in a DFA-style query index. If exists, it transits the current state to the next state according to δ. If there is no information about state transition for the event at the current states, each thread starts to examine whether there are corresponding state transitions in an NFA-style query index. Each thread then creates and appends a new state into the DFA-style query index and then performs a state transition in the DFA-style index. The advantage of this scheme is that it greatly reduces the complexity of powerset computation required for building a full version of the DFA-style query index since DFA states are built at runtime, restricted to the given XPath expressions. Furthermore, when we define states and transitions in each DFA-style query index run by each thread, corresponding path IDs are also partitioned separately so that a path pattern filtering procedure is able to perform separately on each core. It also improves path filtering performance as path ID scans are split into multiple sets and they are run by multiple threads separately. Note that since all the DFA-style query indexes have the same states and transitions, but different path ID lists, workload assigned to each core is naturally balanced. After path pattern filtering procedure, path solutions and their corresponding path IDs are merged and stored in shared memory. When merging path solutions for a distinct path pattern, we also compute the size of merged path solutions. After that, we perform our twig pattern join schedule algorithm to determine to which reducer we assign twig pattern joins (see Algorithm 4). Multiple twig pattern joins are then performed with the path solutions in an in-memory MapReduce model that consists of only a single MapReduce job. In the model, mappers tag reducer IDs to path solutions in order to determine which reducer is given the solutions for twig pattern joins. Afterwards, each reducer performs multiple twig pattern joins with the given path solutions and then returns final answers. A brief overview of our algorithm is presented in Algorithm 1. In the algorithm, we run multiple threads as many as the number of cores n allowed in a system. We call four functions in the algorithm. BUILD-INDEX is called to create an NFA-style query index before runtime with a given set of decomposed path patterns Q de . PARALLEL-PATH is a function that performs path filtering procedure over XML streams and that also lazily builds DFA-style query indexes at runtime. Note that path solutions that each thread filters out are merged and stored into shared memory. Merging path solutions is simply done by a hash table whose key is a path ID, and value is a list of corresponding path solutions. OPTIMIZE is a scheduling algorithm that assigns multiple twig pattern joins to R reducers for balancing workloads, based on a heuristic approximation algorithm called first-fit decreasing. MRTWIG is a function implemented in an in-memory MapReduce model to process multiple twig pattern queries with path solutions that multiple PARALLEL-PATH calls store in shared memory. Note that we do not describe the details of function BUILD-INDEX since it is exactly the same as that of YFilter [14].

123

S.-H. Kim et al.

Algorithm 1 DIXY overall procedure Input: a set of XPath expressions Q, the number of cores n Output: Sol: a set of (quer y_id, r esults) 1: Q de ← decompose Q into path patterns; 2: M A P ← build a mapping table that consists of path IDs and their corresponding query IDs; 3: H TN F A ← BUILD-INDEX(Q de );  build an NFA-style query index H TN F A 4: initialize n threads; 5: while there is a document d in input stream S do 6: Psol ← PARALLEL-PATH(H TN F A , d)) in each thread; 7: merge path solutions; 8: S[] ← compute solution size for each path pattern; 9: assign twig pattern joins to R reducers by calling OPTIMIZE(M A P, S[]); 10: Sol ← MRTWIG(Psol ); 11: return Sol 12: end while

3.2 Parallel path pattern filtering For path pattern filtering over XML streams, each core works with three data structures: a DFA-style query index that guides state transitions for every startElement event, a runtime stack that holds the previous states for backtracking when meeting endElement events, and a matching table. The matching table is implemented with a hash table that consists of triplets, (DFA state ID, a list of NFA state IDs, a list of path pattern IDs), which is used to find a list of NFA state IDs or a list of path pattern IDs that correspond to a specific DFA state. In our approach, multiple DFA-style query indexes are built while processing XML streams, referring to a given NFA-style query index. When filtering XML streams with a DFA-style query index at runtime, multiple NFA states are redundantly merged into a single DFA state to build a new DFA state that has not yet existed in the DFA-style query index. Figure 7 presents an example of a DFA-style query index built at runtime, derived from an NFA-style query index shown in Fig. 3 in the original version of XMLTK [18]. In the example, a DFA state identified by 2 is created by merging multiple states in the NFA-style query index, i.e., 2, 5, 6, 11,

Path solutions List

Matching Table DFA ID DFA Query Index D

0

A

1

B

E

2

6

F

E D C

0,5

Path solutions

0

1,5,10,14

p7

1

(1,24,1)

2,5,6,11,14,16

p1,p4,p10

2

(2,13,2)(14,23,2)

5

3

3,5,7,12,14,16,17 p2,p11

3

(3,8,3)(15,16,3)

7

4

5,8,14,16

p5

4

(4,5,4)

5

5,13,14,16

p8

5

(6,7,4)

6

5,9,14,16

p6

6

(9,12,3)

7

5,14,16

8

4,5,14,16

p3

8

9

5,14,16,17,18

p11,p12

9

(18,19,4)

10

5,14,15,16,18

p9,p12

10

(20,21,4)

9 10

Fig. 7 Example of a DFA-style query index

123

DFA ID

1

8 G

0

Path IDs

2

4

3 C

NFA IDs

7 (17,22,3)

Multi-query processing of XML data streams on multicore

Matching Table

DFA Query Index (3,8,3) (15,16,3)

A

1

(1,24,1)

B

E

2

(2,13,2) (14,23,2)

6

E D

9

G

10

(20,21,4)

1

C (9,12,3)E E 6 D 2 F

(2,13,2) (14,23,2)

Core 2

C

5 7 9

8 G (20,21,4)

10

Path IDs

DFA ID NFA IDs 6

Path IDs

16

1

1,10

p7

7

16

2

2,11,16

p1,p10

8

4,16

p3

3

3,12,16,17

p2,p11

9

16,17,18

p11,p12

10

16,18

p12

4

16

5

13,16

(4,5,4)

4

3 0

0,5

8

D

B

NFA IDs

0

7

C

DFA Query Index

A

DFA ID

5

(18,19,4)

F

(17,22,3)

Core 1

(6,7,4)

3 C

0

4

D

p8 Matching Table

DFA ID NFA IDs

Path IDs

DFA ID NFA IDs

0

0,5

6

5,9,14

1

5,14

7

5,14

8

5,14

2

5,6,14

3

5,7,14

4

5,8,14

5

5,14

p4 p5

9

5,14

10

5,14,15

Path IDs p6

p9

Fig. 8 Path pattern filtering in DIXY

14, and 16. Note that in the NFA-style query index, states identified by 2 and 6 are accepting states, where element events are matched with corresponding path patterns identified by p1, p4, and p10. In other words, when the current state in the DFA-style query index is 2, the element event is matched with path patterns identified by p1, p4, and p10. Note that we build multiple DFA-style query indexes as many as the number of cores for parallel path filtering. This is achieved by partitioning NFA states that will be merged into a single DFA state into multiple sets as many as the number of cores. Figure 8 presents an example of parallel DFA-style query indexes and their matching tables, which are derived from an NFA-style query index shown in Fig. 3. In the figure, we assume that path pattern filtering is performed in parallel by two H/W threads. Contrary to the original version of a DFA-style query index in Fig. 7, NFA state IDs that correspond to a single DFA state are partitioned, splitting a matching table into two tables on the basis of NFA IDs. For example, NFA states 1, 5, 10, and 14 that correspond to a single DFA state 2 are partitioned into two groups, i.e., 1, 10 and 5, 14. This partitioning is simply done in a round-robin fashion and corresponding path pattern IDs are also partitioned. It is noteworthy that we do not need to partition all the NFA states for all DFA states in the matching table. We just select the first DFA state, which holds NFA states more than or equal to the number of cores, except the starting state identified by 0. In Fig. 8, we select NFA states that correspond to DFA state 1 for partitioning since it is the first DFA state whose corresponding NFA states are more than or equal to the number of cores. Therefore, NFA states 1, 5, 10, and 14 are partitioned into two groups and then each group is allocated to each matching table. After that, each thread just creates the next state of DFA state 1, by referencing an NFA-style query index with its own corresponding NFA states, i.e., 1, 10 and 5, 14. As a result, each thread can build its own matching table without any communication between threads. Algorithm 2 describes how we perform parallel path pattern filtering with index creation in detail.

123

S.-H. Kim et al.

Algorithm 2 PARALLEL-PATH(H TN F A , d) M[nc][cur _id] : lookup results of a matching table M for current state cur _id on core nc H TD F A [nc] : a hash table that implements a DFA-style query index for core nc P[nc] : path solutions (twigID, pathID, labels, path count) on core nc 1: initialize P[nc] and r unstack[nc]; 2: cur _id ← ID of the starting state; 3: while not the end of document d do 4: symbol ← an incoming element event of d; 5: if symbol is star t Element then 6: if there is a state transition for (cur _id, symbol) in H TD F A [nc] then 7: next_id ← next state for (cur_id, symbol) in H TD F A [nc]; 8: if next_id is an accepting state then 9: insert path solutions for next_id into P[nc]; 10: end if 11: insert next_id into r unstack[nc]; 12: else 13: Initialize a list r esults N F A  r esults N F A : result state IDs acquired by NFA lookups 14: for each NFA ID id N F A in M[nc][cur _id] do 15: if there is a state transition for (id N F A , symbol) in H TN F A then 16: insert the next states info. in r esults N F A ; 17: end if 18: if there is a state transition for (id N F A , ∗) in H TN F A then 19: insert the next states info. in r esults N F A ; 20: end if 21: end for 22: if r esults N F A is not NULL then 23: if matching table has not be split && |r esults N F A | ≥ num_cor e then 24: select IDs in r esults N F A in a round robin fashion; 25: end if 26: insert r esults N F A in M[nc][new_id D F A ]; 27: insert ((cur _id, symbol), new_id D F A ) in H TD F A [nc]; 28: insert path solutions for new_id D F A into P[nc]; 29: r unstack[nc]. push(new_id D F A ); 30: end if 31: end if 32: else 33: cur _id ← r unstack[nc]. pop(); 34: end if 35: end while

3.3 Parallel twig pattern joins 3.3.1 In-memory parallelization MapReduce is originally devised as a programming model for massive parallel processing in a shared-nothing architecture [13]. The input of a MapReduce-based program is a list of (key1, value1) pairs and function map() is applied to each key–value pair to compute intermediate results, i.e., (key2, value2) pairs. The intermediate key–value pairs are then grouped by key2 values, i.e., (key2, list(value2)). For each key2, function reduce() reads a list of all values, list(value2), to perform aggregation. In-memory MapReduce is a variant of the MapReduce model, devised for providing intra-node parallelism on multicores [41,43]. In the in-memory MapReduce model,

123

Multi-query processing of XML data streams on multicore

each task is allocated to each thread rather than each node and it processes a nonoverlapped portion of input data. Since all the input data are partitioned and assigned to each thread, data communications between threads are minimized. In the model, a runtime scheduler periodically monitors the status of threads and memory usages in order to dynamically assigns tasks into multiple threads. A main difference between the original version of MapReduce and the in-memory MapReduce model is that data can be randomly accessed with pointers and shared by multiple threads by virtue of shared memory. Furthermore, threads are maintained by a thread pool to save costs for thread creations. When a scheduler needs to assign a new task, it uses an idle thread in the thread pool rather than creating a new thread and threads that end their tasks are pushed back to the thread pool. Thus, we do not need to consider thread creations and destructions for running multiple threads in the model. Lastly, the inmemory MapReduce model allows to adjust data granularity that each task accesses when fetching data. This helps improve data locality in L1 cache. It also provides a feature that pre-fetches data for the next task into L2 cache.

3.3.2 Twig pattern joins DIXY features simultaneous processing of multiple twig pattern joins in parallel. To implement this feature, we employ the in-memory MapReduce programming model. Algorithm 3 explains how we process multiple twig pattern joins simultaneously in parallel. Twig pattern joins in DIXY start with the results of path pattern filtering stored in shared memory. In MAP() function, we first read path filtering results and then find XPath expressions that consist of linear path patterns identified by path_id. This works with a hash table, i.e., M A P, that maps query IDs and their corresponding path IDs. The hash table is built when it decomposes a given set of XPath expressions in Algorithm 1. Afterward, we traverse Allocation Map in order to assign the IDs of reducers that actually perform twig pattern joins with path solutions (line 3 in MAP()). This Allocation Map is constructed by calling function OPTIMIZE in Algorithm 1. Finally, we emit mapped output, which consists of a composite key that consists of a pair of r educer _id and quer y_id and its corresponding path solutions (line 4-6 in MAP()). When the mapped outputs are shuffled for delivery to reducers, r educer _id is only considered, so that mapped outputs which share the same r educer _id are transferred to the same reducer. Note that mapped outputs are grouped and sorted in the order of (r educer _id, quer y_id) since MapReduce sorts mapped outputs based on key values for grouping them before performing aggregations in Reduce stage. In REDUCE() function, we actually perform twig pattern joins with the path solutions grouped by query IDs. If an XPath expression is a linear path pattern that does not form a twig, the path pattern itself is a single query. This case does not require performing twig pattern joins (line 2–3 in REDUCE()). Otherwise, we perform twig pattern join for each XPath expression (line 5–6 in REDUCE()). In our approach, we exploit a holistic twig pattern join technique called TwigStack [6], which is proven to guarantee both I/O and computational optimality for a certain subset of XPath expressions. However, other twig pattern join techniques can be applied without loss of generality.

123

S.-H. Kim et al.

Query ID

Path ID

Path solutions

1

1

(2,13,2) (14,12,2)

1

2

(3,8,3) (15,16,3)

1

3

(17,22,3)

3

7

(1,24,1)

3

8

(6,7,4)

4

10

(2,13,2) (14,12,2)

Query ID

Path ID

Path solutions

4

11

(3,8,3) (15,16,3) (18,19,4)

4

12

(18,19,4) (20,21,4)

2

4

(2,13,2) (14,12,2)

2

5

(4,5,4)

2

6

(9,12,3)

3

9

(20,21,4)

Mappers Tagging Reducer ID

Query ID

Path ID

Path solutions

1

1

(2,13,2) (14,12,2)

1

2

(3,8,3) (15,16,3)

1

3

(17,22,3)

3

7

(1,24,1)

3

8

(6,7,4)

3

9

(20,21,4)

Shuffle by Reducer ID

Tagging Reducer ID

Reducers Holistic twig join Holistic twig join

Query ID

Path ID

Path solutions

2

4

(2,13,2) (14,12,2)

2

5

(4,5,4)

2

6

(9,12,3)

4

10

(2,13,2) (14,12,2)

4

11

(3,8,3) (15,16,3) (18,19,4)

4

12

(18,19,4) (20,21,4)

Query ID

Path solutions

1

(17,22,3)

3

(20,21,4)

Query ID

Path solutions

2

(9,12,3)

4

(18,19,4) (20,21,4)

Fig. 9 Parallel twig pattern join

Figure 9 illustrates an example of multiple twig pattern joins in our in-memory MapReduce model. Each mapper reads a list of pairs, ( path_id, path_solution) and then it tags reducer IDs to path_solution to transfer them to relevant reducers. Mapped outputs are then shuffled by reducer ID and transferred to reducers. As mentioned earlier, MapReduce sorts data for grouping before performing reduce functions. In the example, path solutions are grouped and sorted by query ID. The first reducer gets path solutions that correspond to query ID 1 and 3. On the contrary, the second reducer gets path solutions that correspond to query ID 2 and 4. Afterward, each reducer performs twig pattern joins with path solutions that share the same query ID. Finally, final results are emitted from the two reducers. Note that we described query IDs next to corresponding path IDs in the figure for better understanding. 3.4 Optimizations We also endeavor to further optimize our algorithms by reducing redundant computations. First, contrary to the traditional MapReduce model that requires data copies and transfers between mapper and reducers, data items are delivered to pointers thus any redundant data copies are not required. This also helps reduce the communication cost required to deliver mapped outputs to reducers. For example, path solutions in shared memory are accessed and delivered to both map and reduce functions by pointers. Second, redundant path patterns that reside in multiple XPath expressions are merged into a single distinct path pattern. This improves the performance of path pattern filtering since only distinct path patterns are required to process in the procedure. Third, unlike the traditional TwigStack algorithm [6], we do not require reading whole elements for every query node in a twig pattern for twig pattern joins. Since we first perform path pattern filtering, every element is satisfied with certain path patterns

123

Multi-query processing of XML data streams on multicore

Algorithm 3 MRTWIG Function MAP(input[]) input[] : key–value pairs ( path_id, path_solution) 1: for each path_id pi in input[] do 2: quer y_id[] ← looking up corresponding query IDs for pi 3: r educer _id[] ← AllocationMap(quer y_id[]) 4: for i = 0; i < |quer y_id[]|; i++ do 5: emit((r educer _id[i], quer y_id[i]), path_solution)) 6: end for 7: end for Function REDUCE((r educer _id, quer y_id), path_solution[]) 1: for each input item do 2: if quer y_id is a single path pattern query then 3: write(quer y_id, path_solution) 4: else 5: join_r esults ← HOLISTIC-JOIN(quer y_id, path_solution[]) 6: write(quer y_id, join_r esults) 7: end if 8: end for

in a given query set. Therefore, filtered elements are very few compared to the original TwigStack algorithm, resulting in saving both I/O and computational cost during twig pattern joins. Finally, in our approach, query nodes which are neither leaf nodes nor branching nodes and which are not projected to be appeared in final solutions can be ignored during query processing. For example, in query 2 in Fig 3, query node C is not projected so that we do not read all C elements during query processing. Twig pattern join is performed only with B, D, and E elements that correspond to linear path patterns //B, //B/C/D, and //B/E, respectively. In the MapReduce programming model, map and reduce tasks are basically scheduled in FIFO (first in, first out) order. This scheduling scheme is not suitable when only a few reducers work simultaneously because grouped input data for reducers tend to vary in size and it involves reducers to have different workloads each other. In the case, a slower reducer lags the overall performance of the MapReduce program. For balancing workloads across reducers, we assign multiple twig pattern joins into a single reducer such that every reducer has the same workload for fair scheduling. In other words, sum of the costs of twig pattern joins assigned into each reducer is approximately the same as those of other reducers. This scheduling issue is known to be Job-shop problem in the literature, an optimization problem where jobs with various sizes are assigned to multiple machines while minimizing makespace, which is the total length of the schedule [22]. The optimization problem is clearly a special case of the travelling salesman problem, which is known to be NP-Hard so that there is no pseudo-polynomial solution known in the literature. Therefore, we rather solve this optimization problem with a heuristic approach based on a greedy approximation algorithm. Meanwhile, the CPU and I/O complexities of a holistic twig pattern join algorithm is known to be O(|I nput + Out put|) for a twig pattern query Q, where |I nput + Out put| is the sum of sizes of input and output data for Q [6,10]. We consider here only |I nput| for computing join cost as we do not know the output size for a certain twig pattern join in advance.

123

S.-H. Kim et al.

Algorithm 4 presents how we implement the optimization techniques mentioned earlier. The algorithm requires the size of each path solution for computing |I nput|, which is the sum of the sizes of path solutions that participate in a certain twig pattern join. In the algorithm, we first select path IDs whose size is zero. This is simply achieved by checking path patterns which are not enrolled in path solutions emitted from mappers. We then delete all the twig pattern queries in which the path patterns are enrolled (line 1–2). The rationale behind this is that if there is no solution for a certain path pattern, the results of twig pattern join that consumes the path pattern is zero. Therefore, we can ignore the path solutions, saving I/O and computational cost further. After that we sort M A P, a mapping table that consists of path IDs and their corresponding query IDs, in a descending order of join costs (line 3). The join costs are acquired by computing |I nput|, sum of the size of path solutions that participate in a twig pattern join. With a sorted M A P, the algorithm assigns a twig pattern join that has the highest cost (the first row in M A P to a reducer that has the lowest cost). Since every reducer’s cost is 0 in the beginning, we randomly select one at the first time. After that, the algorithm computes the overall cost of the reducer by adding the cost of the assigned twig pattern join. This procedure runs until all twig pattern joins are assigned to reducers. It is a straightforward that this algorithm consists of sorting |Q| query IDs and assigning |Q| into |R| reducers. Thus, the time complexity of the algorithm is O(|Q| · log|Q| + |Q| · |R|). In our approach, both Q and R is negligible so that Algorithm 4 can be simply run in memory. Algorithm 4 OPTIMIZE Input: M A P, S[] Output: Allocation Map 1: p[] ← select path IDs whose size is zero; 2: find and delete all the information about queries that contains at least one of p[] in M A P; 3: sort rows in M A P in a descending order of path solution size; 4: cost[R] ← all 0 ; 5: while M A P is not empty do 6: select a twig pattern query qi in M A P; 7: select a reducer ID r j that has the lowest cost in cost[]; 8: put (qi , r j ) into Allocation Map; 9: cost[r j ] ← cor s[r j ] + qi .cost (); 10: delete qi from M A P; 11: end while 12: return Allocation Map;

4 Analysis We provide the total cost model of our algorithms in this section. Note that we assume that the cost of every memory access is constant, denoted by cm . In our cost model, the total cost of our algorithms Ctotal is the sum of the cost of filtering path patterns Cpath and the cost of twig pattern joins Ctwig : Ctotal = Cpath + Ctwig

123

(1)

Multi-query processing of XML data streams on multicore

Path pattern filtering performs in three steps: (1) state transitions for incoming element events, (2) appending path solutions to relevant DFA states, and (3) getting path IDs for path patterns which are matched with the current incoming element event when arriving at accepting states. Therefore, the cost of path pattern filtering Cpath can be defined by the sum of the costs of the three steps as follows. Cpath = Ctransition + Cwrite + Cretrieve

(2)

State transitions occur whenever element events come in and the element events are usually either startElement or endElement. Therefore the cost of state transitions Ctransition in path pattern filtering is composed of the sum of the cost of all state transitions triggered by startElement events and the cost of all state transitions triggered by endElement events. Ctransition = {cost for startElement events} + {cost for endElement events}

(3)

When startElement event occurs, our algorithm performs a hashtable lookup to find the next DFA state and then push the current state information into a runtime stack. Hashtable lookups and stack pushes are performed in constant time, denoted by cm . In addition, an XML element is composed of a start tag and an end tag. Therefore, the number of all the star t Element events is equal to the number of elements, denoted by |e|. Therefore, the cost of all state transitions triggered by startElement events can be computed as follows: {cost for startElement events} = {cost for hashtable lookups}+{cost for stack pushes} = {cm × {the number of startElement events}} + {cm × {the number of startElement events}} = {cm · |e|} + {cm · |e|} = 2cm · |e|

(4)

When endElement events occur, our algorithm only calls pop() to a runtime stack and this action is also done in constant time. Thus, the cost for processing endElement events is: {cost for endElement events} = {cost for stack pops} = cm × {the number of endElement events} = cm · |e|

(5)

Therefore, the cost of state transitions is computed by: Ctransition = {cost for start events} + {cost for end events} = 3cm · |e|

(6)

We now define the cost of appending path solutions to relevant DFA states. This action appends path solutions to DFA states for delivering the solutions to the next

123

S.-H. Kim et al.

stage, i.e., twig pattern joins, when the state that we visit is an accepting state. This action occurs once only when a startElement event comes. Therefore, the cost of this step is defined by: Cwrite = {cost for writing path solutions in memory} = cm × {the number of startElement events} = cm · |e|

(7)

Getting path IDs for path patterns matched with the current incoming element event is triggered by arriving at an accepting state. Path IDs for a certain accepting state are stored in a linked list so that traversal of all items in a linked list with size L takes O(L) time complexity. Hence, Cretrieve = {cost for retrieving paths stored in visited accepting states} = {cost for retrieving linked lists} = cm × |S| × {the number of paths in a state}

(8)

where |S| is the total number of states in our DFA-style query index and |P| is the number of distinct path patterns in a given query set Q. All the subsets of path patterns partitioned into multiple DFA-style query indexes are pairwise disjoint. Moreover, path IDs can be redundantly stored in different accepting states in a single DFA-style query index since multiple NFA states can be merged in to a single DFA state. Therefore, path patterns attached in a single state is bounded by: {the number of paths in a state} ≤

|P| n

(9)

where n is the number of cores. Thus, Cretrieve is bounded by: Cretrieve ≤

cm · |S| · |P| n

(10)

Finally, the overall cost of path pattern filtering is bounded by: Cpath = Ctransition + Cwrite + Cretrieve |P| ≤ 3 · cm · |e| + cm · |e| + cm · |S| · n   |S| · |P| ≤ cm · 4|e| + n

(11)

We now analyze the cost of twig pattern joins in our approach. Twig pattern joins are performed in two steps in our approach. We first schedule multiple twig pattern joins by assigning the joins into reducers such that all the reducers are guaranteed to have the same workload. The cost of this step is denoted by Coptimize . The twig pattern

123

Multi-query processing of XML data streams on multicore

joins are then performed by reducers. The cost of this step is denoted by Creducer . Thus, the cost of twig pattern join is simply defined as follows. Ctwig = Coptimize + Creducer

(12)

Coptimize is the cost taken for running Algorithm 4. As we described earlier in Sect. 3.4, this algorithm is composed of two steps: sorting XPath expressions in a given query set Q in a descending order of the sum of path solutions and actual testing for assigning twig pattern joins into R reducers. The complexity of this algorithm is thus computed by: Coptimize = {sorting cost} + {testing cost} = cm × |Q| log |Q| + cm × (|Q| × |R|) = cm · |Q| · (log |Q| + |R|),

(13)

where |Q| is the number of a set of XPath expressions given by users. The cost of running reducers Creducer is bounded by a reducer that takes the longest time for twig pattern joins because every reducer works in parallel. Furthermore, each reducer is assigned approximately |Q|/|R| twig pattern join operations. Consequently, Creducer is computed by Creducer = max(Cjoin (Ri )),  ≈

Ri ∈ R

|Q| |R|

j=1 C join (q j )

|R|

, q j ∈ Q,

(14)

where Cjoin(q j ) is the cost of a twig pattern join for an XPath expression q j , which is computed by the sum of the sizes of path solutions that participate in the twig pattern join. Therefore, the total cost of twig pattern joins is computed by Ctwig = Coptimize + Creducer  |Q| |R| ≈ cm · |Q| · (log |Q| + |R|) +

j=1 C join (q j )

|R|

, qj ∈ Q

(15)

5 Experiments 5.1 Experimental setup We evaluated our algorithms against three major XML stream processing engines, i.e., YFilter [14], XMLTK [18], and MxQuery [16], in terms of speedup and scalability with a variety of the number of cores and XPath expressions. We implemented our algorithms in GNU C++ version 4.6.3, and all experiments were carried out on a Linux machine equipped with AMD FX-8120 3.1 GHz CPU with 8 cores and 8 GB memory.

123

S.-H. Kim et al. Table 1 Statistics of dataset and query set Query set

Data set

Average query depth

6–10 (8 by default)

Wildcard(*) probability

0–100% (10% by default)

//-axis probability

0–100% (10% by default)

Average document depth

XMark: 5 Treebank: 7

# of elements in each document on average

XMark: 77 Treebank: 250

# of distinct paths on average

XMark: 344 Treebank: 5391

For path pattern filtering, we also implemented two versions of our algorithms: NDIXY and D-DIXY. D-DIXY is the set of algorithms that we described in this article and N-DIXY is the set of algorithms that use multiple NFA-style query indexes, developed to measure the performance of D-DIXY in comparison to partitioned NFAstyle query indexes. In N-DIXY, each thread filters XML event streams with its own NFA-style query index rather than a DFA-style query index in D-DIXY. To keep path information that tracks from the starting state to each state while partitioning a single NFA-style query index into multiple NFA-style indexes in N-DIXY, we assigned prefix-based labels to each state. Then, we recovered previous states by decoding the labeled values when building each NFA-style query index. To build multiple NFA-style query index, we partitioned NFA states into multiple pairwise disjoint sets. For holistic twig pattern joins, we implemented T wigStack algorithm [6] on the Phoenix++ architecture, an in-memory implementation of the MapReduce programming model [41]. Note that other holistic twig pattern join algorithms are also applicable without loss of generality since our algorithms work independently from the twig pattern joining algorithms. We also implemented YFilter algorithm, which was originally implemented in Java, in C++ for fair comparison. We used two synthetic XML data sets, i.e., XMark and Treebank dataset, to generate XML streams. XMark was the data-oriented data with a shallow tree structure with high fan-out [38]. On the contrary, Treebank dataset had a deep structure with many recursions, involving many distinct root-to-leaf path patterns [31]. By using XMLGen generator [38] and Oxigen XML editor [40] with the DTDs from two datasets, we generated 1000 stream data in the size between 15 and 56 KB for each data set. We also created up to 10 million XPath expressions with the query generator provided in YFilter [14]. The statistics of our dataset and query set are presented in Table 1.

5.2 Experimental results 5.2.1 Path filtering Figure 10 presents the performance of our algorithms in path pattern filtering, compared to XMLTK [18], YFilter [14] and MxQuery [16]. A line labeled query-level in

123

Multi-query processing of XML data streams on multicore

(a)

(b)

(c)

(d)

Fig. 10 Speedup comparison in path pattern filtering. a 1000 XMark documents with 8 cores. b 1000 Treebank documents with 8 cores. c 500,000 path queries over XMark dataset. d 500,000 path queries over Treebank dataset

the figure means the speedup of another simple partitioning strategy that randomly groups XPath expressions into multiple subsets and that builds multiple NFA-style query indexes with them before runtime. The performance of each algorithm was measured in speedup, the ratio to the throughput of YFilter. As already known in the literature, XMLTK outperformed YFilter when the number of path queries were large. However, the two algorithms did not benefit from multiple H/W threads supported by a multicore CPU at all. As the number of path queries increased in Fig. 10a, b, our algorithms steadily outperformed other algorithms. Specifically, D-DIXY exhibited super-linearity at processing 10 million path queries. N-DIXY was also better than the conventional algorithms, but not much than D-DIXY. The reason was that N-DIXY had an inherited nature which came from an NFA-style query index that traced too many states while processing path patterns. Figure 11 presents the results of our experiments performed with the variety of path patterns. We measured the total elapsed time of each algorithm with 500,000 path patterns by changing the various properties of path patterns, e.g., the maximum depth, the ratio of wildcards and double slashes in a set of path patterns. As shown in the figures, D-DIXY outperformed other algorithms in all cases. Figure 12 presents how the throughput of our algorithms changed as time went by. We performed this experiment with 500,000 path patterns. YFilter and N-DIXY exhibited steady throughput when processing both XMark and Treebank datasets. Meanwhile, N-DIXY outperformed YFilter much more as the number of threads increased. On the other hand, XMLTK and D-DIXY exhibited two different aspects on changes in throughput. First, at the very starting time, throughput of both D-DIXY and XMLTK with two different dataset was very low, but it then quickly grew up as time went by. The reason was that when we started stream processing, there were few states in DFA-style query indexes so that each thread needed to create new states by traversing an NFA-style query index. Second, the throughput of both XMLTK and D-DIXY fluctuated very often when processing Treebank dataset. This was because

123

S.-H. Kim et al.

(a)

(b)

(c)

(d)

(e)

(f)

(g)

(h)

(i)

(j)

(k)

(l)

Fig. 11 Path pattern filtering with various query patterns. a Maximum depth over XMark. b Double slash over XMark. c Wildcard over XMark. d Maximum depth over XMark. e Double slash over XMark. f Wildcard over XMark. g Maximum depth over Treebank. h Double slash over Treebank. i Wildcard over Treebank. j Maximum depth over Treebank. k Double slash over Treebank. l Wildcard over Treebank

there were too many distinct root-to-leaf path patterns in Treebank dataset with deep tree structures, compared to XMark. Hence, it was required to build many new states much more than in the case of XMark dataset while processing the XML stream. 5.2.2 Twig pattern joins Figure 13 presents the experimental results of our twig pattern join algorithm, M Rtwig. We measured the speedup of our algorithm, compared to holistic twig join (HTJ) and the post-processing module of YFilter (Y-post). MRtwig always outperformed both HTJ and Y-post. In addition, the performance gap became larger as both the number of twig queries and the number of cores increased in Fig. 13. However, the speedup of M Rtwig is not as much as that of path pattern filtering. It is noteworthy

123

Multi-query processing of XML data streams on multicore

(a)

(b)

(c)

(d)

Fig. 12 Changes in throughput as time goes by (w/500,000 queries). a NFA-based approaches (XMark). b NFA-based approaches (Treebank). c DFA-based approaches (XMark). d DFA-based approaches (Treebank)

(a)

(b)

(c)

(d)

Fig. 13 Speedup in twig pattern joins. a Increasing number of queries (8 cores) with XMark. b Increasing number of queries (8 cores) with Treebank. c Increasing number of cores (500,000 queries) with XMark. d Increasing number of cores (500,000 queries) with Treebank

123

S.-H. Kim et al.

(a)

(b)

Fig. 14 Effect of optimization in twig pattern joins. a Increasing average input data size (500,000 queries and 8 cores) over XMark. b Increasing average input data size (500,000 queries and 8 cores) over Treebank

that HTJ was not faster than Y-post when processing Treebank data streams, whereas HTJ outperformed Y-post when processing XMark data streams. It implied that HTJ showed poorer performance when processing XML streams whose structures are deep with many recursions. Since there were many distinct linear path patterns in Treebank dataset, the number of twig pattern joins sharply increased. Figure 14 presents the performance of MRtwig algorithm with the optimization technique described in Algorithm 4. To show the effectiveness of our optimization technique, we measured the speedup of our algorithm, compared to the throughput of Y-post with increasing input data size. As shown in the figure, the performance of the optimized version of MRtwig (op-MRtiwg) was slightly improved. The performance gap became larger when the input data size increased. However, with Treebank dataset, the optimized version of MRtwig could not beat the original MRtwig well. Treebank dataset had too many distinct root-to-leaf linear path patterns as mentioned earlier so

(a)

(b)

(c)

(d)

Fig. 15 Overall speedup. a Increasing number of queries (8 cores) over XMark. b Increasing number of queries (8 cores) over Treebank. c Increasing number of cores (500,000 queries) over XMark. d Increasing number of cores (500,000 queries) over Treebank

123

Multi-query processing of XML data streams on multicore

that the number of twig pattern joins were very large enough to be well dispersed across reducers. 5.2.3 Overall performance Figure 15 presents the overall performance of our algorithm set including path pattern filtering and twig pattern joins in terms of speedup, compared to several algorithms. In this strategy, we used the post-processing module of YFilter for twig pattern joins. We also tested two combinations of our algorithms in the experiment. DIXY+MRtwig is a combination of our DFA-style query indexes and twig pattern join algorithm implemented in the in-memory MapReduce model. DIXY+Y-post is a combination of our DFA-style query indexes and the post-processing module in YFilter. In all cases, DIXY+MRtwig outperformed other algorithms. MXQuery exhibited the worst performance in all cases as it processed a single query at a time. Other algorithms got worse as the number of queries increased. On the contrary, the more queries that were submitted, the better the speed up of our algorithms(DIXY+MRtwig). In addition, the speedup of DIXY+MRtwig linearly scaled up as the number of cores increased. The speedup of other algorithms stood still or rather deteriorated with the increasing number of queries. Note that we did not test XMLTK in the experiment as it did not provide any facility for twig pattern joins. 5.2.4 Memory usages Figure 16 presents the number of DFA states in a single query index in our approach. XMark dataset had only 334 distinct root-to-leaf path patterns. As a result, the number of states for the dataset was in a steady state regardless of the size of input data stream. On the contrary, Treebank dataset had 5391 distinct root-to-leaf path patterns so that the number of DFA states increased as more input streams were processed. Figure 17 presents the information about memory usages in our approach. We examined both the total number of DFA states in multiple query indexes and the sum of sizes of matching tables after finishing 1000 XML documents in both datasets. As shown in Fig. 17a, b, the total number of DFA states increased as the number of cores increased because we had multiple query indexes for path pattern filtering. However, the number of queries rarely influenced the total number of states. Note that the size of a DFA-style query index is much smaller than other data structures, e.g., a matching table that connects DFA state with their corresponding NFA states and path IDs, as it just pairs the state ID and its next state ID. On the contrary, in Fig. 17c, d, we observed that the overall size

Fig. 16 Number of states

123

S.-H. Kim et al.

(a)

(b)

(c)

(d)

Fig. 17 Memory usages in our approach. a The total number of DFA states (XMark). b The total number of DFA states (Treebank). c The overall size of matching tables (XMark). d The overall size of matching tables (Treebank)

of matching tables was proportional to the number of queries rather than the number of cores. Note that we partitioned matching tables according to the number of cores and the partitioned matching tables did not share any item each other. Thus, the sum of the sizes of matching tables on n cores is equal to the size of a single matching table on single core. Consequently, we could conclude that the memory footprint of our algorithm set were proportional to the number of queries rather than the number of cores.

6 Related work Processing of XML streams has been widely studied in a decade. The algorithms for XML stream processing can be categorized with respect to their internal structures: automata-based techniques [1,34,36,37], stack-based techniques [6,8,11,20], and array-based techniques [2,25]. Automata-based techniques mainly rely on an automata-style query index, built with a set of queries given before runtime, for filtering XML streams. Initial XML streaming systems like XFilter [1] and XSQ [37] exploit finite state automata. They transit states to the next states when a start element event arrives, while an end element event induces backtracking to the previous states. However, they exhibit exponential time complexity since they require tracking many states while processing XML streams. SPEX [34] achieves polynomial time complexity on account of storing matched sub-results instead of all matched results in the stack. Onizuka presents a hybrid structure called a layered NFA to solve the problem of exponential increase in the number of states with a state sharing method [36]. However, all the above algo-

123

Multi-query processing of XML data streams on multicore

rithms are hard to be used in a multi-user environment since they process only a single XPath query at a time. Some techniques for multi-query processing of XML streams have also been presented in the literature. The key idea of the multi-query processing techniques is to share common subexpressions between a set of multiple queries. YFilter [14] and GFilter [9] combine a set of XPath queries into a single NFA-style query index by sharing common prefixes and suffixes of different queries, respectively. GFilter also proposes an encoding scheme that concisely indicates path matches as in the Twig2 Stack algorithm [8]. However, these algorithms exhibit inefficiency due to an exponential increase of the number of state transitions. To overcome the issue, XPush builds a query index in a form of deterministic pushdown automata with common predicates [19]. XMLTK presents a promising algorithm that rather builds a deterministic finite automaton at runtime to avoid all the powerset computations required to build a full version of DFA from a corresponding NFA [18]. However, the algorithms are not devised to work in parallel. Array-based algorithms compile user queries into an array structure to store the interesting parts of an XML stream. Some algorithms including TurboXpath [25] and XAOS [2] achieve polynomial time complexity since they do not require any state transitions. XAOS can also evaluate XPath queries with the parent and ancestor axes [2]. However, the algorithms are also devised to work only with a single query at a time. The stack-based algorithms are based on a set of linked stacks that compactly represent candidates of an XPath query and holistic twig pattern joining techniques fall into this category. Holistic twig pattern join is a multi-way sort-merge join algorithm that avoids producing obsolete path solutions during twig pattern matching [6]. The join algorithm is proven for twig patterns that have no parent axis to be I/O optimal. Much research has been focused on guaranteeing I/O optimality for a certain class of XPath [10,28,29]. iTwigJoin improves the I/O optimality for twig patterns with ancestor and parent axes or with one branching node by partitioning input streams into a collection of distinct tag+level or a collection of distinct paths [10]. Variants of B+ tree index have been suggested to skip unnecessary input data so that twig queries can be processed more efficiently [6,24]. TJFast presents a different approach to reducing the size of input data [29]. It encodes the root-to-leaf path of each leaf node in an extended Dewey order code so that internal nodes could be derived from the encoded values of leaf nodes. Twig2 Stack [8] employs a bottom-up scheme for processing twig patterns to reduce unnecessary intermediate results that are not completely removed in TwigStack [6]. However, the conventional algorithms for holistic twig pattern join do not intend to support XML streams so that some stack-based algorithms are newly devised to process XML streams. TwigM achieves polynomial time complexity by finding matches to all query predicates at close event [11]. StreamTX exploits the block-and-trigger technique to be feasible to process XML streams [20]. However, aforementioned algorithms only consider a single XPath query. For multi-query processing, the algorithms cannot help building many chained stacks as many as the number of queries. Accordingly, the performance degradation cannot be evaded when a large number of queries are handled over XML streams.

123

S.-H. Kim et al.

Recently, studies on the parallel query processing for XML data are reported in the literature. Machdi et al. propose a parallel TwigStack algorithm for a multicore environment [30]. They divide and store XML data in each bucket per core and then exploit XML data into the cache in order to reduce overheads involved in data partitioning. Feng et al. also divide and store XML data into a bucket for each core and process the data using the PathStack algorithm [15]. Shnaiderman et al. propose a parallel algorithm based on the PathStack and TwigStack for guaranteeing more I/O optimality [39]. However, all the algorithms do not consider processing of XML data streams with a set of multiple queries. Ogden et al. exploit parallel pushdown transducers to process a small set of XPath queries on XML streams by leveraging data parallelism [33]. They split XML data streams into several chunks with state mappings, i.e., path information from all possible starting states to accepting states. PXQA rather follows task parallelism in that it dynamically divides its query processing task by obtaining a task-dependency graph for dynamic mapping through the proposed algebra [23]. Onizuka proposes an algorithm that minimizes memory usage in a DFA-style query index by reducing the number of DFA states [35]. It is achieved by clustering a set of path queries with respect to axis types. Zhang et al. divide a single NFA index built by YFilter engine into several NFAs called tasks, and then each task is dynamically allocated to each core for balancing workloads between cores [44]. Aforementioned studies, however, focus only on executing an individual or a small set of XPath queries on streaming XML and have a lack of supporting twig pattern joins. Contrary to all the above studies, our techniques enable us to process a massive set of XPath queries including twig patterns over streaming XML in parallel. In addition, our approach also guarantees a fair distribution of the tasks across cores through index-level parallelism rather than neither data- nor query-level parallelism.

7 Conclusion In this article, we presented a set of efficient algorithms called DIXY for handling a massive set of twig pattern queries over XML streams on multicore. Unlike conventional algorithms, our algorithms processed a massive set of twig pattern queries over XML streams in a parallel and balanced way. To achieve this, we parallelized XML stream query processing by virtue of both index-level parallelism and the in-memory MapReduce programming model. In addition, we conducted extensive experimentation for testing our algorithms. We compared our algorithms with conventional algorithms, i.e., YFilter, XMLTK and MXQuery in terms of speedup and throughput. With 8 cores, our algorithm outperformed YFilter by up to about 20 times in terms of path pattern processing. In total, our algorithm processed a massive set of XPath queries over XML streams up to ten times faster than YFilter with good scalability. Acknowledgements This work was partly supported by two Grants (2015K000260 and B0101-16-2666) funded by the Ministry of Science, ICT and Future Planning and also supported by KAIST and KISTI, Korea.

123

Multi-query processing of XML data streams on multicore

References 1. Altınel M, Franklin MJ (2000) Efficient filtering of xml documents for selective dissemination of information. In: Proceedings of the 26th International Conference on Very Large Data Bases (VLDB), Cairo, Egypt 2. Barton C, Charles P, Goyal D, Raghavachari M, Fontoura M, Josifovski V (2003) Streaming XPath processing with forward and backward axes. In: Proceedings of the 19th International Conference on Data Engineering, 2003, IEEE, pp 455–466 3. Bordawekar R, Lim L, Kementsietsidis A, Kok BWL (2010) Statistics-based parallelization of xpath queries in shared memory systems. In: Proceedings of the 13th International Conference on Extending Database Technology, ACM, pp 159–170 4. Bordawekar R, Lim L, Shmueli O (2009) Parallelization of xpath queries using multi-core processors: challenges and experiences. In: Proceedings of the 12th International Conference on Extending Database Technology: Advances in Database Technology, ACM, pp 180–191 5. Bray T, Paoli J, Sperberg-McQueen CM, Maler E, Yergeau F (1998) Extensible markup language (XML). World Wide Web Consortium Recommendation REC-xml-19980210. http://www.w3.org/ TR/1998/REC-xml-19980210 6. Bruno N, Koudas N, Srivastava D (2002) Holistic twig joins: optimal xml pattern matching. In: Proceedings of the 2002 ACM SIGMOD International Conference on Management of Data, ACM, pp 310–321 7. Robie J, Chamberlin D, Dyck M, Snelson J (2014) XML path language (XPath) 3.0. World Wide Web Consortium Recommendation. http://www.w3.org/TR/xpath-30/ 8. Chen S, Li HG, Tatemura J, Hsiung WP, Agrawal D, Candan KS (2006) Twig 2 stack: bottomup processing of generalized-tree-pattern queries over xml documents. In: Proceedings of the 32nd International Conference on Very Large Data Bases, VLDB Endowment, pp 283–294 9. Chen S, Li HG, Tatemura J, Hsiung WP, Agrawal D, Candan KS (2008) Scalable filtering of multiple generalized-tree-pattern queries over XML streams. IEEE Trans Knowl Data Eng 20(12):1627–1640 10. Chen T, Lu J, Ling TW (2005) On boosting holism in xml twig pattern matching using structural indexing techniques. In: Proceedings of the 2005 ACM SIGMOD International Conference on Management of Data, ACM, pp 455–466 11. Chen Y, Davidson SB, Zheng Y (2006) An efficient XPath query processor for XML streams. In: Proceedings of the 22nd International Conference on Data Engineering. IEEE, p 79 12. Choi H, Lee KH, Lee YJ (2014) Parallel labeling of massive xml data with mapreduce. J Supercomput 67(2):408–437 13. Dean J, Ghemawat S (2008) Mapreduce: simplified data processing on large clusters. Commun ACM 51(1):107–113 14. Diao Y, Altinel M, Franklin MJ, Zhang H, Fischer P (2003) Path sharing and predicate evaluation for high-performance xml filtering. ACM Trans Database Syst (TODS) 28(4):467–516 15. Feng J, Liu L, Li G, Li J, Sun Y (2010) An efficient parallel pathstack algorithm for processing XML twig queries on multi-core systems. In: Proceedings of the 15th International Conference on Database Systems for Advanced Applications, Springer, pp 277–291 16. Fischer P (2013) XQuery: a lightweight, full-featured XQuery engine. http://mxquery.org/ 17. Gou G, Chirkova R (2007) Efficient algorithms for evaluating xpath over streams. In: Proceedings of the 2007 ACM SIGMOD International Conference on Management of Data, ACM, pp 269–280 18. Green TJ, Gupta A, Miklau G, Onizuka M, Suciu D (2004) Processing xml streams with deterministic automata and stream indexes. ACM Trans Database Syst (TODS) 29(4):752–788 19. Gupta AK, Suciu D (2003) Stream processing of xpath queries with predicates. In: Proceedings of the 2003 ACM SIGMOD International Conference on Management of Data, ACM, pp 419–430 20. Han WS, Jiang H, Ho H, Li Q (2008) Streamtx: extracting tuples from streaming xml data. Proc VLDB Endow 1(1):289–300 21. Hill MD, Marty MR (2008) Amdahl’s law in the multicore era. Computer 41(7):33–38 22. Hochbaum DS, Shmoys DB (1987) Using dual approximation algorithms for scheduling problems theoretical and practical results. J ACM (JACM) 34(1):144–162 23. Huang X, Si X, Yuan X, Wang C (2014) A dynamic load-balancing scheme for xpath queries parallelization in shared memory multi-core systems. J Comput 9(6):1436–1445 24. Jiang H, Wang W, Lu H, Yu JX (2003) Holistic twig joins on indexed xml documents. In: Proceedings of the 29th International Conference on Very Large Data Bases, vol 29, VLDB Endowment, pp 273–284

123

S.-H. Kim et al. 25. Josifovski V, Fontoura M, Barta A (2005) Querying xml streams. VLDB J 14(2):197–210 26. Kwon Y, Balazinska M, Howe B, Rolia J (2012) Skewtune: mitigating skew in mapreduce applications. In: Proceedings of the 2012 ACM SIGMOD International Conference on Management of Data, ACM, pp 25–36 27. Lee KH, Lee YJ, Choi H, Chung YD, Moon B (2012) Parallel data processing with mapreduce: a survey. AcM sIGMoD Rec 40(4):11–20 28. Lu J, Chen T, Ling TW (2004) Efficient processing of xml twig patterns with parent child edges: a look-ahead approach. In: Proceedings of the Thirteenth ACM International Conference on Information and Knowledge Management, ACM, pp 533–542 29. Lu J, Ling TW, Chan CY, Chen T (2005) From region encoding to extended dewey: On efficient processing of xml twig pattern matching. In: Proceedings of the 31st International Conference on Very Large Data Bases, VLDB Endowment, pp 193–204 30. Machdi I, Amagasa T, Kitagawa H (2009) Executing parallel twigstack algorithm on a multi-core system. In: Proceedings of the 11th International Conference on Information Integration and Webbased Applications & Services, ACM, pp 176–184 31. Marcus MP, Marcinkiewicz MA, Santorini B (1993) Building a large annotated corpus of english: the penn treebank. Comput Linguist 19(2):313–330 32. Miliaraki I, Koubarakis M (2012) Foxtrot: distributed structural and value xml filtering. ACM Trans Web (TWEB) 6(3):12 33. Ogden P, Thomas D, Pietzuch P (2013) Scalable xml query processing using parallel pushdown transducers. Proc VLDB Endow 6(14):1738–1749 34. Olteanu D (2007) Spex: streamed and progressive evaluation of XPath. IEEE Trans Knowl Data Eng 19(7):934–949 35. Onizuka M (2003) Light-weight xpath processing of xml stream with deterministic automata. In: Proceedings of the Twelfth International Conference on Information and Knowledge Management, ACM, pp 342–349 36. Onizuka M (2010) Processing xpath queries with forward and downward axes over xml streams. In: Proceedings of the 13th International Conference on Extending Database Technology, ACM, pp 27–38 37. Peng F, Chawathe SS (2005) Xsq: a streaming xpath engine. ACM Trans Database Syst (TODS) 30(2):577–623 38. Schmidt A, Waas F, Kersten M, Carey MJ, Manolescu I, Busse R (2002) Xmark: a benchmark for xml data management. In: Proceedings of the 28th International Conference on Very Large Data Bases, VLDB Endowment, pp 974–985 39. Shnaiderman L, Shmueli O (2015) Multi-core processing of XML twig patterns. IEEE Trans Knowl Data Eng 27(4):1057–1070 40. SyncRO Soft S. Oxygen xml editor. http://www.oxygenxml.com/ 41. Talbot J, Yoo RM, Kozyrakis C (2011) Phoenix++: modular mapreduce for shared-memory systems. In: Proceedings of the Second International Workshop on MapReduce and Its Applications, ACM, pp 9–16 42. Wu X, Theodoratos D (2013) A survey on xml streaming evaluation techniques. VLDB J 22(2):177–202 43. Yoo RM, Romano A, Kozyrakis C (2009) Phoenix rebirth: scalable mapreduce on a large-scale shared-memory system. In: Proceedings of the 2009 IEEE International Symposium on Workload Characterization, IEEE, pp 198–207 44. Zhang Y, Pan Y, Chiu K (2010) A parallel XPath engine based on concurrent NFA execution. In: Proceedings of the IEEE 16th International Conference on Parallel and Distributed Systems. IEEE, pp 314–321

123