9512.net
甜梦文库
当前位置:首页 >> >>

Condensative stream query language for data streams



Condensative Stream Query Language for Data Streams
Lisha Ma and Hamish Taylor School of Mathematical and Computer Sciences Heriot-Watt University, Edinburgh, EH14 4AS, UK Email: lm2,hamish@macs.hw.ac.uk August 14, 2006
Abstract Over a traditional DBMS (Database Management System), the answer to an aggregate query is usually much smaller than the answer to a similar non-aggregate query. Therefore, we call such query processing condensative. Current proposals for declarative query languages over data streams do not support such condensative processing. In order to make existing stream query languages more expressive, we propose a new data stream model, referred to as the sequence model, and a novel stream query language CSQL (Condensative Stream Query Language). We show that the sequence model supports precise tuplebased semantics that is lack in previous time-based model. Moreover CSQL processes a declarative semantics that allows one to specify and reason about the di?erent meanings of the frequency by which a query returns answer tuples, which are beyond previous query languages over streams. In addition, a novel condensative stream algebra is de?ned by extending the existing stream algebra with a new operator, frequency operator, to capture the condensative property. We show that a condensative stream algebra enables the generation of e?cient continuous query plans, and can be used to validate query optimisation.

Keywords: Data Stream, Stream Query Language, Window Aggregation, Sequence Model, Stream Algebra, Condensative Queries.

1

Introduction

A data source which is constantly publishing readings does so in the format of stream data. These can be found in many areas such as fanatical 1

applications, network monitoring, sensor network, etc. Di?erent from traditional data sets presented by a DBMS, stream data arrive in multiple, rapid and constantly changing way. This creates more di?culties in managing such data by using conventional DBMS. Data streams have received increased attention in database research. Recently more challenging requirements are generated for the data stream ?eld with the development of interaction among di?erent academic ?elds and the increasing demand of information sharing, which consequentially lead to many related research problems in semantics, query processing, runtime management, etc. One major challenge is the development of techniques for providing continuously updating answers to aggregate queries over potentially unbounded streams. A general approach for addressing this challenge is by means of windows queries, which add window clauses to continuous queries and thus allow aggregate queries to be evaluated over a segment of the input data stream rather than over the entire stream. There has been a great deal on research in developing algorithms for windowed aggregate queries (Arasu & Widom 2004b, Chandrasekaran & Franklin 2002, Cranor, Johnson, Spataschek & Shkapenyuk 2003, Dobra, Garofalakis, Gehrke & Rastogi 2002a, Dobra et al. 2002a, Gilbert, Kotidis, Muthukrishnan & Strauss 2001, Li, Maier, K.Tufte, Papadimos & Tucker 2005).. Driven by di?erent purposes, a number of Distributed Stream Management Systems (DSMSs) have been developed (Babcock, Babu, Datar, Motwani & Widom 2002a, Abadi, Carney, Cetintemel, Cherniack, Convey, Lee, Stone? braker, Tatbul & Zdonik 2003, Yao & Gehrke. 2003, Cranor et al. 2003, Chen, DeWitt, Tian & Wang 2000, Chandrasekaran, Cooper, Deshpande, Franklin, Hellerstein, Hong, Krishnamurthy, Madden, Reiss & Shah 2003) and several stream query languages (Arasu, Babcock, Babu, McAlister & Widom 2004, Dobra, Garofalakis, Gehrke & Rastogi 2002b, Hammad, Franklin, Aref & Elmagarmid 2003, Arasu & Widom 2004a) have been recently proposed. However, current techniques are limited in two crucial aspects. Firstly, most query languages over streams do not have the necessary constructs to support condensative query processing over streams, as is the case in traditional DBMSs. We call aggregate query processing in a traditional DBMS condensative, since the answer to an aggregate query is usually much smaller than the answer to its non-aggregate counterpart. Having a declarative stream language that supports condensative query processing is crucial since data streams are potentially unbounded, while main memory and secondary storage have ?xed limits; new query results should therefore be generated whenever the window contents change. Secondly, the focus of previous work has mainly been on query evaluation while fundamental questions in con-

nection with data models and formal semantics for queries have not yet been thoroughly addressed. This lack makes it di?cult to reason about aggregate queries and compare di?erent languages in a uniform semantics. The situation is aggravated when one moves to the realm of distributed computation, as is usually the case when dealing with data streams. Prior Work. CQL (continuous Query Language) (Arasu & Widom 2004a) is one of the most powerful relation-based language that is used in the STREAM system, which is proposed with full semantics over time-based model. CQL provides advanced windowing capabilities, and it is even possible to PARTITION a window on an attribute and specify the width of a window (e.g. ROWS 100 or RANGE 100 MINUTES). However as the order of tuples is not uniquely de?ned in a time-based model, there is no clear semantics for continuous queries involving tuple-based sliding windows or moving aggregation. Another expressive language that supports condensation is StreaQuel, which is implemented in TelegraphCQ (Chandrasekaran et al. 2003). In StreaQuel, each query de?nition is followed by a for-loop construct that speci?es (1) the set of windows over which the query is to be executed, and (2) how often the query should be run. Recently, Li et al. (Li et al. 2005) have proposed a similar, but more declarative way to de?ne windowed aggregate queries. Their window de?nition has three parameters: RANGE speci?es the window size, SLIDE the window movement, and WATTR the granularity, that is, whether RANGE and SLIDE are de?ned in terms of timestamps or sequence numbers of tuples. All these patterns were de?ned with respect to window identi?ers. Such semantics de?ne a function to uniquely identify each window extent for a given window aggregate query; also, they require an inverse function that, for each tuple, it determines the extents of the window to which the tuple belongs.Window identi?er semantics was implemented in an extended version of the Niagara Query Engine (Chen et al. 2000) for evaluating aggregate window queries over data streams. In all three languages frequency and window length have to be de?ned in terms of the same granularity. Besides frequency can not exist independently without window expression and has to be de?ned in a ?xed place within a query due to the limited semantics to interpret di?erent meanings of frequency, e.g., the frequency over the input stream cannot be di?erentiated from the frequency over the result stream. A host of research exists on tackling aggregate query evaluation over data streams. Widely di?ering approaches employing, e.g., hashing, sampling, sketches and wavelets, just to name a few, have been proposed in the literature (Babcock, Datar & Motwani 2004, Gehrke, Korn & Srivastava

2001, Gilbert et al. 2001, Manjhi, Nath & Gibbons n.d.). All those approaches, however, are workload-driven, as opposed to being user-driven. In more detail, it is desirable to give the user considerable freedom in how windows are de?ned and handled by the system. In other words, the user should be in a position to declare window semantics at the language level, rather than rely on the system to deduce tradeo?s between accuracy and resource consumption, as is usually the case. This is what we term condensative query evaluation: the user should be able to merely declare how frequently sampling should take place and the system should perform aggregation based on the user’s instruction. In this paper, we introduce the semantics of the CSQL language as well as a novel condensative stream algebra. We illustrate the expressive ability of our language by some example queries and show how it allows us to overcome some limitations of existing query languages. CSQL aims to extend the expressiveness of stream query languages along the dimension of answer frequency, which is a live issue for continuous queries and for which no analogy exist in classical databases. More speci?cally, our main contributions are as follows. 1. We present a formal semantics that models time-varying stream data as a function over an ordered sequence domain; this is called a sequence model. Our semantics draws features from existing work on sequence databases (Seshadri, Livny & Ramakrishnan 1995) and the time-based approach of modeling stream data (Arasu & Widom 2004a), supports a precise semantics for a tuple-based operators that can not be captured by existing time series model. 2. We incorporate sampling and jumping windows in a declarative fashion into a novel condensative stream query language CSQL. We show that our query language can specify window queries found in (Arasu & Widom 2004a, Carney, Cetintemel, Cherniack, Convey, Lee, Seidman, ? Stonebraker, Tatbul & Zdonik 2002, Chandrasekaran et al. 2003, Chen et al. 2000, Yao & Gehrke. 2003); also it provides functionality to support (a) a declarable way to express frequency need either on base stream or derived stream (b) a mixed jumping window over streams (c) nested aggregation. None of them can be realised in existing steam query languages. 3. We introduce a condensative stream algebra by extending the existing stream algebra with a new kind of operator, called the frequency operator as well as its concrete semantics. We also introduce the relevant

optimisation approach for our condensative model such as splitting, interleaving and compositive. Furthermore as an independent operator, the frequency operator can be easily pushed down in a stream algebra to avoid unnecessary computation. This allows us to split aggregate query processing technique into two levels, namely, tuple sampling and aggregation evaluation, which provides a ?exible mechanism to interact with di?erent advanced aggregate operators. 4. Finally, based on these we have implemented our conceptual operators in a prototype query engine. In order to demonstrate the e?ciency gained by pushed down frequency operator for a jumping window query over an ordered sequence stream, we compare it with the window Id approach presented in (Li et al. 2005). Our experimental result shows that a pushed down frequency operator is e?ective and it outperforms window Id approach, besides a condensative stream algebra can be reasonably optimised for continuous queries with a frequencybased equivalence. We also show how to evaluate “mixed jumping window” queries, which cannot be handled by existing approaches. Organisation. The remainder of the paper is organised as follows. Section 2 reviews sequence databases and the time-based data model. Sections 3, 4 and 5 introduce the formal semantics of our language. Language syntax is presented in section 6. Example queries are given in section 7 and 8. The main algorithms for CSQL are given in section 9, while experimental results is presented in section 10. Section 11 discusses related work and section 12 concludes the paper.

2

Background

In this section we brie?y introduce stream data and semantics of continuous queries, besides we review two previous data models, from which our model draws some features.

2.1

Stream Data

Recently, a new class of applications on stream data (Chandrasekaran et al. 2003, Cranor et al. 2003, Dobra et al. 2002a) have been addressed in the database community. Mobile-phone call records, stock ticker data, web usage logs, and network monitoring data are all examples of stream data. By its nature, stream data changes constantly and quickly during the query

execution period, and normally it is impossible or quite expensive to store it, and to operate on it many times. It is di?erent from traditional data sets that are stored in a database, which can be queried several times without any change or with small updates. Due to the rapid and continuous change of stream data, it is often not adequate to handle continuous queries with traditional DBMSs (Database Management Systems). Requirements of new technologies for stream processing arise such as stream query languages, windowed queries, approximative answers, load shedding, stream operator scheduling, stream mining, etc. (Arasu & Widom 2004b, Babcock et al. 2004).

2.2

Semantics of Continuous Queries

The ?rst continuous queries were introduced in the Tapestry system (Terry, Goldberg, Nichols & Oki 1992) with a SQL-based language called TQL. The Tapestry system allows users to pose a permanent query over a database of e-mails and bulletin board messages without joins and time, and returns the relevant answers continuously whenever a new incoming tuple passes the ?lter. The continuous query semantics is implemented by executing a one-time query periodically. The frequency for query answering pushes the information constantly, for example answering the query once every minute. The basic algorithm is shown below. FOREVER DO Execute Query Q RETURN results to user Sleep for some period of time ENDLOOP Compared with conventional one-time queries that are evaluated once over static data sets stored by a DBMS, continuous queries are evaluated continuously. Answers are also streamed continuously, and do not stop until input ceases.

2.3

Sequence Database Model

The SEQ sequence model and algebra were introduced by Seshadri et al. in (Seshadri et al. 1995). They de?ne a sequence as an ordering function from the integers (or another ordered domain such as calendar dates) to the items in the sequence. The SEQ model separates the data from the ordering information and can deal with di?erent types of sequence data by supporting an expressive range of sequence queries.

Some operators, such as selection, projection, various set operations, and aggregation (including moving windows) are carried over from the relational model. A number of operators for manipulating sequences have also been developed. The SEQ model has been implemented in SRQL (Sorted Relational Query Language) (Ramakrishnan, Donjerkovic, Ranganathan, Beyer & Krishnaprasad 1998), in which sequences are implemented as logically or physically sorted multi-sets (relations) and the language attempts to exploit the sort order.

2.4

Time-based Stream Model

In a data stream model, data items appear in a time-varying, continuously arriving, and append-only format. A formal time-based stream model has been de?ned in (Arasu & Widom 2004a) and a declarative Continuous Query Language (CQL), including a formal semantics, has been de?ned. CQL has been implemented in the STREAM system at Stanford. The core of the model is as follows: Let Dr be the set of all tuples that satisfy the schema r. Let T be the set of all timestamps. Then a stream s with schema r is a subset s ? Dr × T , such that for every τ ∈ T the bag { | e, τ ∈ s} is ?nite. With P(Dr ) {e } we denote the set of all subsets of Dr . A time-dependent relation R for the schema r is a mapping: R : T → P(Dr ), such that each set R(τ ) is ?nite. With these de?nitions, we can transform a stream into a time-varying relation and vice versa.

3

Sequence Model

In a time-based model the order of tuples is not uniquely de?ned. This drawback leads to ambiguous semantics for continuous queries involving tuple-based sliding windows or moving aggregation. To address this issue, we drew features from sequence databases and decided to construct a sequence dependent model for streams. In this section we introduce our model as well as a formal semantics. As will be seen in the next two sections, our model is more expressive than the existing time-based model in (Arasu & Widom 2004a). We conclude by describing the common properties that need to be shared by di?erent data stream models, and that we want to encode

into our new model. We consider that an abstract relational stream should have the following characteristics: ? A stream consists of tuples; ? A stream has a relational schema and all its tuples comply with that schema; ? A stream develops over time. Therefore it is assumed that there is a set T to represent the time domain, such as wall-clock time or the natural numbers. A timestamp is any value from T . Timestamps are linearly ordered.

3.1

Relational Schema

A relation schema has the form: r(a1 : T1 , . . . , ak : Tk ), where r is a relation symbol, a1 , . . . , ak are attributes, and T1 , . . . , Tk are types as in SQL. Timestamp is not included as a default attribute in the schema in case there is a need to separate various di?erent timestamps associated with a tuple such as a tuple’s birth time or its arrival time.

3.2

Time Domain

There is no restriction on whether the time domain has to use wall clock time or the natural numbers. However it is still required that the general properties of the time domain be de?ned. A time domain should be ordered. Let R ? X × X be an ordering (i.e, R is re?exive, antisymmetric and transitive). For every ordering R the strict version R of R is de?ned by xR y i? xRy and x = y. A binary relation is Linear: if for all x, y ∈ X where x = y either xR y or yR x Dense: if for all x, y ∈ X where xR y, there is a z ∈ X such that xR z and zR y Discrete: if for every two elements x, y ∈ X where xR y, there are only ?nitely many elements z between them, i.e, there are only ?nitely many z such that xR z and zR y

If a linear ordering R is discrete, then for every element x ∈ X, either at least one element y is such that xR y, or there is no element y such that xR y. We require that a time ordering should have following properties: 1. Any two distinct timestamps must be comparable. This means, the ordering should be linear. 2. The ordering should not be dense, but discrete. A time domain with these properties is essentially identical with the integers or an interval of the integers. This allows us also to de?ne the length of a sliding window. A window of length n consists of the starting point plus the next (n ? 1) elements. If it is decided that a time domain has a ?rst element and is not bounded, then it can be represented by the natural numbers.

3.3

Local and Non-Local Semantics

A stream operator is a function ? that takes a stream s as input and outputs a stream ?s. We categorise the stream operators that apply to a stream as local or non-local. Suppose there is an operator Q that transfers a data stream s from a sequence model to a time based model Qs, and Qs(t) represents a bag of the tuples that have timestamp t. De?nition 1 ? is local if: Qs1 (t) = Qs2 (t)implies (?(Qs1 ))(t) = (?(Qs2 ))(t), otherwise it is non-local. Most relational operators are local such as selection σ, and most stream operators are non-local such as sliding window operators W.

4

Stream Operators

We show that queries expressible in a time-based model can also be speci?ed in our model. Furthermore, as will be seen in section 8, our model and operators are capable of expressing queries found in practice that are beyond previous models and languages. Next, we introduce some typical stream operators in our model.

4.1

Selection Operator

We ?rst de?ne the conditions for a selection operator. A term is either an attribute name or a value constant. An atomic condition is an expression of the form t1 ρ t2 , where t1 , t2 are terms and ρ is a comparison like “<”, “ ”, “=”, “ ”, or “>”. Arbitrary conditions can be built up from atomic conditions using the boolean connectives “?”, “∨”, or “∧”. Conditions are denoted by the letter C. We de?ne for every condition C a selection operator σC . Intuitively, σC (s) is the subsequence of tuples with index j of stream s, where j ∈ N. We then de?ne σC (s) for an arbitrary stream s recursively by saying what it is the tuple σC (s)(j) for an arbitrary number j. We ?rst de?ne the set of indices I1 as I1 = k ∈ N s(k) satis?es C . If I1 = ?, then let n1 = min I1 and de?ne σC (s)(1) := s(n1 ), otherwise let σC (s) = ⊥. Now, suppose nj is de?ned for some j ∈ N. Then let Ij+1 = k ∈ N s(k) satis?es C and k > nj .

Again, if Ij+1 = ?, then let nj+1 = min Ij+1 and de?ne σC (s)(j + 1) := s(nj+1 ), otherwise let σC (s)(j + 1) be unde?ned. Also, σC (s)(j + 1) is unde?ned if nj is unde?ned.

4.2

Sliding Window Operators

We will use Wt to denote a time-based window, and use Wn to denote a tuple-based sliding window. Time-based Sliding Window A time-based sliding window Wt is bounded by its temporal size t even though we do not know exactly how many tuples there are within the window size. However it slides whenever the time slot increases. The sliding rate will depend on the time granularity. We also introduce sτ (k) to denote the timestamp for tuple s(k). More formally, we de?ne the output stream Wt s as a sequence of sets Wt s(j) for a given j in stream s. We say Wt s(j) is not de?ned if s(j) is not de?ned, otherwise we have Wt s(j) = s(k) sτ (k) + t sτ (j) and k j .

Tuple-based Sliding Window A tuple-based sliding window will slide whenever a new tuple arrives. So, for every n ∈ N, we have a tuple-based sliding window Wn over stream s, which produces a sequence of sets Wn s(j) = s(k) k max{0, j ? n} and k j .

4.3

Frequency Operator

The frequency operator F will pick the stream tuple based on a de?ned frequency. Depending on how we set the frequency, we can have di?erent types of frequency operators. Basically, we can set parameters either by a physical bound (tuple-based) or a logical bound (time-based). In order to separate the di?erent bounds, we use Fn and Ft to denote a tuple-based frequency operator and a time-based frequency operator respectively. Tuple-based Frequency Operator For every natural number n ∈ N we have a tuple-based frequency operator Fn , which selects every n-th tuple of a stream. Formally: Fn s(j) = s(n × j). Time-based Frequency Operator For every time instance t, we have a time-based frequency operator Ft . Conceptually it selects tuples with timestamp j × t as a stream Ft s, where j ∈ N. If there is no tuple with timestamp j × t, then we will output the last tuple within that time slot. We say Ft s(j) is a subsequence of tuples with order j over order nj of stream s, where j ∈ N. Then if s(nj ) = ? let nj = max k ∈ N (j ? 1) × t sτ (k) j×t ,

otherwise it is unde?ned. Now, Ft s(j) = s(nj ), for all j ∈ N if nj is de?ned, otherwise Ft s(j) =⊥.

4.4

Jumping Windows

Sometimes, we want our window to jump rather than slide. This can be achieved by applying a frequency operator to a sequence of sets instead of

posing a frequency to a sequence of tuples. We call such a kind of window a jumping window. Depending on how we de?ne the frequency length, we categorise jumping windows into two di?erent types: tuple-based jumping windows and time-based jumping windows. Tuple-based Jumping Window For every number n ∈ N, and a sequence of sets W s that are produced by the sliding window operators Wn or Wt , we can de?ne a tuple-based jumping window Fn (W s), which selects every n-th set of W s as follows: (Fn (W s))(j) = W s(n × j). Time-based Jumping Window For a sequence of sets W s that are produced by a sliding window operator W applied to the stream s, we obtain a time-based jumping window Ft (W s) by selecting a subsequence W s(nj ) (j ∈ N) of W s(n). Intuitively, Ft (W s)(j) is the ?rst window that contains an element with a timestamp that is greater or equal to t × j. Formally, for an arbitrary stream s and a window operator W , we de?ne Ft (W s) recursively by saying what it is the set Ft (W s)(j) for an arbitrary number j. We ?rst de?ne the set of indices Ij as Ij = i ∈ N ?k. s(k) ∈ W s(i) ∧ sτ (k) ≥ j × t .

If Ij = ?, then we de?ne nj := min Ij , and Ft (W s)(j) := W s(nj ), otherwise let Ft (W s)(j) = ⊥. Time-based Jumping Operator For every time instance t, and a stream W s that is produced by applying a sliding window operator W to a stream s, we obtain a time-based jumping window Ft (W s) by selecting a bag of tuples for every multiple of t from stream W s. We then de?ne (Ft (W s))(τ ) for an arbitrary time t, where t ∈ T as: (Ft (W s))(τ ) = W s(τ ) if τ = j × t for some j ∈ N ? otherwise

5

Condensative Stream Queries

A condensative stream query Q, in essence, is a traditional SPJ query (Turner & Lowden 1985) augmented with frequency predicates. Conceptually such queries have the “canonical” form of Eq. 1 in terms of relational algebra: Q = π? F(p1 ,...,pn ) σB(c1 ,...,cm ) (R1 × . . . × Rh ) (1)

That is, upon the product of the base relations (R1 × . . . × Rh ), two types of operations performed with projected attributes (as indicated)are returned as the results. Filtering: a Boolean function σB(c1 ,...,cm ) ?lters the results by the selection operator σB (e.g., B = c1 ∧ c2 ∧ c3 for example 1), and Frequency: a Frequency function F(p1 , . . . , pn ) picks up the results from the base relations. Our goal is to support such condensative stream queries e?ciently. Condensative stream models Boolean ?ltering, i.e., σB(c1 ,...,cm ) as a ?rst-class construct in query processing. With algebraic support for optimisation, Boolean ?ltering is virtually never processed in the canonical form (of Eq. 1). Consider, for instance, B = c1 ∧c2 for c1 as a selection over R and c2 as a join condition over R × S. The algebra framework supports splitting of selections (e.g., σc1 ∧c2 (R × S) ≡ σc1 σc2 (R × S) ≡ σc1 (R c2 S)) and interleaving them with other operators (e.g., σc1 (R c2 S) ≡ σc1 (r) c2 S). Their algebraic equivalences thus enable query optimisation to transform the canonical form into e?cient query plans by splitting and interleaving. Such algebraic support, splitting and interleaving for optimisation, are completely inherited for frequency, i.e., F(p1 , . . . , p2 ). Moreover, the support can be compositive. Suppose we have a frequency function F = p1 ∧ p2 ∧ p3 , for p1 , p2 , p3 as a frequency condition over R1 , R2 , R3 respectively. p1 , p2 , p3 are either all time-based or all tuple-based. Suppose we have: p3 modp2 = 0, p3 modp1 = 0, p2 modp1 = 0, then the frequency functions are compositive (e.g., Fp1 Fp2 Fp3 (R1 × R2 × R3 ) ≡ Fp1 Fp2 Fp2 (R1 × R2 × R3 ) ≡ Fp1 Fp1 Fp1 (R1 × R2 × R3 ) ≡ Fp1 (R1 ) × (R2 ×Fp1 R3 ). When queries are nested, frequency functions can be compositive even when the frequencies involved do not have the same granularity. (e.g, for a self-join query (R1 ×Fp1 R2 ) ×Fp2 (R1 ×Fp1 R2 ), the inner frequency condition p1 has the priority to synchronise the outer frequency condition p2 , (R1 ×Fp1 R2 ) ×Fp2 (R1 ×Fp1 R2 ) ≡ (R1 ×Fp1 R2 ) ×Fp1 (R1 ×Fp1 R2 )).

Finally we extend relational algebra’s pushing down optimisation into a stream algebra. We category the operators in a stream algebra by local and non-local semantics de?ned in de?nition 1 as such semantics assists our pushing down optimisation approach. An operator can be easily pushed down if it is a local operator such as a time-based frequency operator or a selection operator, otherwise not, i.e., suppose we have a time based frequency function F, then we have F(R1 × R2 × R3 ) ≡ F(R1 ) × F(R2 ) × F(R3 ).

6

Syntax of CSQL

CSQL is a stream language that adds additional language patterns to SQL to support a stream processing ability. The core syntax of CSQL can be described with a context-free grammar.
string: represents for any valid string number: represents any valid number asterisk: represents * <Query>?→<Select><From> | <Select><From><Where> | <Select><From><Where><GroupBy> <Name>?→string | <Name>.<Name> |asterisk| <Name>AS<Name> <Attribute List>?→<Name> | <Name>(,<Name>)? <Granularity>?→Milliseconds|Seconds|Minutes|Hours|Tuples| Millisecond|Second|Minute|Hour|Tuple <Length>?→number <Frequency>?→[<Fre>Partitioned By <Attribute List>]| [<Fre>] <Fre>?→Frequency<Length><Granularity> <Range>?→Range<Length><Granularity> <Compare>?→> | < | >= | <= | = | <> <Clause>?→<Name><Compare><Name> | <Name><Compare>number <op>?→ and|or <Condition>?→<Clause> | <Clause> (<op><Clause>)? <term>?→COUNT|SUM|AVG|MAX|MIN <Aggregation>?→<term> (<Name>)| <Aggregation> (, <Aggregation>)? <Select>?→ SELECT <SelectTerm> |<Select><Frequency> <SelectTerm>?→<Aggregation> | <Attribute List> | <SelectTerm> (, <SelectTerm>)? <From>?→ FROM <FromTerm> <LeftBracket>?→( <RightBracket>?→) <FromTerm>?→<Name> | <Name><Frequency> |

<Name> [<Range>]| <Name> [<Frequency>, <Range>]| <LeftBracket><Query><RightBracket>AS <FromTerm> | <FromTerm> (, <FromTerm>)? <Where>?→ WHERE <Condition> <GroupBy>?→GROUP BY <Attribute List>

7

Example Scenario and Queries

Consider a tracing system to study the behaviour of wild animals, which collects distributed sensor measurements. One of the sensors records the pulse of an animal. Upon every heart beat of an animal it will send out a tuple with a timestamp and the animal’s ID. The schema of the relation for these measurements has the form Pulse Id, Timestamp The other type of sensors report on an animal’s blood pressure and body temperature regularly, for example every (full) second. It has a core relation BodyCondition Id, Species, BTemp, BloodP, Timestamp In these two relations: Id is the unique number of each animal, Species represents the type of animal, Timestamp represents the timestamp, BloodP is the blood pressure of the animal, and BTemp is the animal’s temperature. For ease of presentation, we assume that tuples arrive in the order of their timestamp attribute. Here are four queries with requirements on how often to evaluate them. 1. Simple sampling query: For every 100 tuples, report all horses’ body condition records. 2. Latest result query:Report the latest results of measurement on blood pressure and body temperature for each animal at the rate of one reading every 10 minutes, and evaluate the query for every 100 arriving tuples. 3. Aggregate query: For each animal, what is the pulse rate per minute? We suppose the user wants to know the result for every 10 tuples.

4. Nested aggregate query: For each animal, what is the average pulse rate per hour? We suppose the user wants to make use of the answers to the ?rst query and expects a result every minute.

8

CSQL vs. Condensative Stream Algebra

We introduce a declarative language CSQL for continuous queries, similar to SQL but extended with operators such as those discussed in Section 4, as well as a mechanism for directly submitting plans in the query algebra that underlies our language. In the CSQL language, a frequency operator can be expressed by adding to a range variable of a stream, say S, the expression [Frequency F ], where F denotes an interval length. The length is either de?ned in terms of a number of tuples as [Frequency n Tuples] (“every n tuples”) or in terms of a time period, e.g. as [Frequency t Minutes] (“every t minutes”). The operator picks tuples based on the prede?ned frequency length from the base stream. For group-based sampling we use [Frequency F Partitioned By A1 , . . . , Ak ]. The operator will partition S into di?erent substreams based on the grouping attributes A1 , . . . , Ak , then for each substream the operator picks tuples based on the prede?ned frequency. We separate the frequency over an input stream and a result stream by putting the frequency expression in either the FROM clause or the SELECT clause. We can combine the frequency operator with sliding window operators when we want our window to move much faster. We call such a kind of window a jumping window. When Frequency = 1 Tuple, it is equivalent to a normal sliding window. Depending on how the frequency length is de?ned, we distinguish between tuple-based and time-based jumping windows. Instead of computing the answer whenever a new tuple arrives, the frequency operator requires a computation only after an interval of the frequency length. This means that, the operator will take a “nap” between any two computations. A jumping window has two parameters: The window size W . All of the tuples that arrive from the start during a period of W , or within W tuples have to be stored for a computation. The length of the “nap” F . A new window is only output after the nap is over. A jumping window is always de?ned by a sliding window operator, followed by a frequency operator expression, such as [Range W , Frequency F ].

Our semantics supports the mixing of tuple-based frequencies with timebased window bounds and vice versa. Such windows are called “mixed jumping windows”. To enable frequency-based query processing and optimisation, we extend relational algebra (Kie?ling 2002) into stream algebra (Babcock, Babu, Datar, Motwani & Widom 2002b) by substituting relations for streams, where the operators, and algebraic laws “respect” and take advantage of the “compositive” property introduced in section 5. We extend such a stream algebra by adding the new operator frequency operator F and the sliding window operator W, and so generalise the existing relational operators (e.g., π, A, σ, G in ?gure 1) to be “frequency-based”. We show how declarative CSQL is by expressing the frequency in di?erent places within a query. We also show how those di?erences a?ect the stream algebra in ?gure 1 and 2 respectively. Consider the ?rst query.

Figure 1: Stream Algebra for Example Queries 1 & 2 Query 1 “For every 100 tuples, report all horses’ body condition records.”. The query can be interpreted as for every 100 tuples over the base stream: Q1 (a): SELECT FROM * BodyCondition AS B [Range 1 Minute, Frequency 100 Tuples]

WHERE

B.Species = ’horse’

It also can be understood as for every 100 tuples over an answer stream (a stream full of animal’s pulse rate per minute). Q1 (b): SELECT * [Frequency 100 Tuples] FROM BodyCondition AS B [Range 1 Minute] WHERE B.Species = ’horse’ Finally it can be understood as for every 100 tuples over a substream w.r.t to each animal. Q1 (c): SELECT FROM WHERE * [Frequency 100 Tuples Partitioned By B.Id ] BodyCondition B [Range 1 Minute] B.Species = ’horse’

From the above comparison, we show how CSQL supports di?erent meanings of frequency. Figure ?? give more intuitive interpretation for these three queries using stream algebra. Query 2 Report the latest results of measurement on blood pressure and body temperature for each animal at the rate of one reading every minute, and evaluate the query for every 100 arriving tuples. SELECT FROM * [Frequency 1 Minute Partitioned By B.Id] BodyCondition B [Frequency 100 Tuples Partitioned By B.Id]

This query will take all the last 100 tuples, and then group the stream into substreams with equality of grouping attribute “Id”. Finally it returns the relation that contains all the latest results from each substream within a minute.

Query 3 “For each animal, what is the pulse rate per minute?”, and supposing the user wants to know the result for every 10 tuples. SELECT P.Id, COUNT(*) [Frequency 10 Tuples Partitioned By P.Id] FROM Pulse P [Range 1 Minute, Frequency 1 Tuple] GROUP BY P.Id This mixed jumping window query will evaluate the query with sliding semantics Frequency = 1 Tuple. It will count the last minute’s worth of Pulse tuples for each animal after every coming tuples. It can be easily optimised by using the default jumping semantics Range = Frequency without losing the precision of the result. Then the frequency operator will evaluate the relational query over the window, at the end of the nap period. The frequency operator sitting in the SELECT clause will sample the result substream for each animal with one tuple out of every ten tuples. Query 4 “For each animal, what is the average pulse rate per hour?”, and supposing that the user wants to make use of the answers to the ?rst query and expects a result every minute. SELECT PR.Id, AVG(PR.Rate)[Frequency 1 Minute Partitioned By PR.Id] FROM (SELECT P.Id, COUNT(*) [Frequency 10 Tuples Partitioned By P.Id] AS Rate FROM Pulse P [Range 1 Minute, Frequency 1 Tuple] GROUP BY P.Id ) AS PulseRate PR [Range 1 Hour, Frequency 1 Tuple] GROUP BY PR.Id This query contains two nested frequencies, but only the outer query determines how often the inner query is evaluated. We can also register the inner query or a similar query with a frequency that is an integer fraction of 10 tuples as an independent view, then we can use this existing inner query

to answer a new query.

Figure 2: Stream Algebra for Example Queries 3 & 4

9

Frequency Algorithms

We provide some aggregation algorithms from the implementation for the CSQL language below. The algorithms are categorised based on the frequency declaration. The main optimisation approach in the algorithm is to ?nd the right window for each tuple (one tuple can belong to more than one window). As we construct the window incrementally within a sequence model, any time-based declaration (window or frequency) may lead to the next tuple jumping over some empty windows. A variable jumpto is therefore de?ned to calculate the starting bound of a target window. Time-based declarations also require two pointers to mark the current insertion and deletion point. Based on di?erent updating requirements (lazy or eager), we can either delete the old tuple whenever we insert the new tuple or delete the old tuple when the timestamp changes. To note that we use di?erent bu?er for di?erent window, e.g., a circular bu?er with ?xed size wn when window length is in sequence number and a link list when window length is in time range wt .

Algorithm 1 Non-grouping Aggregation with Time-based Frequency Require: frequency length is in time range, non-grouping aggregate query Input: tuple, query starting time q, current loop index i Input: frequency length ft , window length wt loop if wt is in time range & wt ≤ ft then jumpto= ft ? i ? wt + q + 1 {jumpto: sliding window bound} while tτ ≥ jumpto+wt do i++ {tτ : tuple’s timestamp} jumpto= ft ? i ? wt + q + 1 end while while tτ < jumpto do discard current tuple, get next tuple end while while tτ < jumpto+wt do insert tuple into the bu?er get next tuple end while perform moving aggregation over the bu?er i++ else if wt is in time range & wt > ft then while tτ > ft ? i + q do i++ jumpto= ft ? i ? wt + q + 1 end while while tτ ≤ ft ? i + q & tτ ≥ ft ? i ? wt + q do insert tuple into the bu?er τO = τhead {τhead : timestamp of the ?rst tuple in the bu?er} τN = τtail {τtail : timestamp of the last tuple in the bu?er} get next tuple end while perform aggregation over the bu?er update the bu?er by condition: τN -τO ≤ wt i++ end if end loop

Algorithm 2 Non-grouping Aggregation with Mixed-jumping Window Require: frequency length is in time range, non-grouping aggregate query, wt is in sequence number Input: tuple, query starting time q, current loop index i Input: frequency length ft , window length wt loop while tτ > ft ? i + q do i++ end while while tτ ≤ ft ? i + q do insert tuple into the bu?er get next tuple end while perform moving aggregation over the bu?er i++ end loop

10

Experimental Study

To evaluate the e?ectiveness of our semantics, we implemented our conceptual operators in a prototype query engine and conducted a preliminary experimental study. Our framework was implemented in Java and our experiments were executed on a Pentium IV 2.4Ghz with 512M of physical memory. We report wall clock timings and calculate execution time by measuring the average cost of 10000 answer tuples. Streaming behavior was simulated by using a pull-based execution model: the more e?ective the algorithm, the more tuples it is able to process. A frequency operator typically spends its time sampling and aggregating, so there is a clear division of work. We are interested in showing how it is possible to optimize the sampling cost in such an environment, as we want to treat the e?ciency of the aggregation algorithm as an orthogonal issue. Therefore, we used the same aggregation in all experiments, and have calculated the execution time as the sum of scanning the input stream and producing the aggregate. As a result, any performance gain we observe will be due to the e?ciency of the sampling methodology, which is directly tied to how well the semantics of the operators can be implemented. Experiments are divided into two parts. Firstly, we evaluate the performance of the pushed down frequency operators in contrast to the window identi?er approach for a tuple-based jumping window (AVG) query. Note

Algorithm 3 Aggregation with Partitioned Tuple-based Frequency Require: partitioned tuple-based frequency Input: tuple, query starting time q, current inner loop index i for each group Input: frequency length fn , window length wn or wt loop if window length is in sequence number wn & wn ≤ fn then sliding window bound jumpto= fn ? i ? wn + 1 while tn ≤ jumpto do discard current tuple end while while tn ≤ fn ? i do insert tuple into the bu?er get next tuple end while perform aggregation over the bu?er i++ else if window length is in sequence number wn & wn ≥ fn then while tn ≤ fn ? i do insert tuple into the bu?er get next tuple end while perform aggregation over the bu?er i++ else if window length is in time range wt then while tn ≤ fn ? i do insert tuple into the bu?er τO =τhead τN =τtail update the bu?er by condition: τN -τO ≤ wt get next tuple end while perform aggregation over the bu?er i++ end if end loop

Figure 3: Cost Ratio of Frequency vs. Window Id Approach (a). that we consider the case where one tuple may be in the contents of multiple windows. The e?ciency of evaluating queries without or with a GROUP BY-clause is shown in Figures 3 and 4 respectively. The horizontal axis is the frequency length measured in tuples. The vertical axis is the performance ratio between the execution time using a pushed down frequency operator, over the execution time of the window identi?er approach. The window length is represented as a percentage of the frequency length. For example a 30% W/F ration for a frequency length F of 1000 tuples will evaluate the query over a window length bounded by 300 tuples. As an independent operator, the frequency operator can be easily pushed down in a query plan to avoid unnecessary computation. This allows us to split aggregate query processing in two levels: (i) tuple sampling, and (ii) aggregation evaluation; this modeling provides a ?exible mechanism to interact with di?erent advanced aggregate operators. Our experiment showed that pushing down the frequency operator is an e?ective technique and it signi?cantly outperforms the window identi?er approach. Secondly, we evaluate the e?ciency of processing mixed jumping window queries, which cannot be handled by existing approaches. Figure 5 shows the upper bound performance time of a mixed jumping window (AVG) query that has a frequency length speci?ed on a tuple basis and a window length speci?ed on a time basis. The horizontal axis is the frequency length (measured in tuples) which the window length is mea-

Figure 4: Cost Ratio of Frequency vs. Window Id Approach (b). sured in seconds. The vertical axis is the total execution time (per answer tuple appearing in the average) measured in milliseconds. Note that performance can be further improved if a more e?cient aggregation algorithm is employed.

11

Related Work

Aggregation operators such as SUM, COUNT, MIN, MAX, and AVG are considered as blocking operators. Since continuous data streams may be in?nite, the incorporation of blocking operators into a stream algebra poses problems. Windows queries accomplish the traditional blocking operator (aggregation) in an incremental form, and restricts it to an operator over a window. There has also been a good deal of research undertaken on studies of stream algorithms for di?erent problems; examples include the cases where a window operator does not exist or is ine?cient to evaluate with limited memory requirements such as counting, hashing, sampling, summaries, sketches, wavelets (Babcock et al. 2002b, Babcock et al. 2004, Gehrke et al. 2001, Gilbert et al. 2001, Manjhi, Nath & Gibbons 2005). However, the strong incentive behind traditional aggregation computation has not only given rise to research to support aggregation over streams, but also to how to deal with them e?ectively . We therefore take another important property of traditional aggregation processing into account, which is called

Figure 5: Performance of Mixed Jumping Window. condensative in our paper. Such semantics will lead innovation on both stream query language and query processing. We note that we also want to realise user-level sampling rather than to imply a system-level trade-o? between accuracy and the amount of memory. Previous approaches have been focusing on supporting aggregation over stream or optimising aggregate evaluation.

12

Conclusion

We have studied stream queries from a theoretical angle. More speci?cally, we have incorporated sampling and jumping in a declarative fashion to a query language, CSQL. We have also introduced a formal semantics on both new data model and novel stream operators and proposed the frequency frequency operator for extending stream query languages with more expressibility, allowing e.g., for user-de?ed sampling and condensative query processing. Our new frequency operator can be combined with the existing sliding window operators, which yields powerful jumping window operators, that allow even for mixed windows that can not be handles by existing query languages. Furthermore we have developed a simple yet e?ective optimization technique to implement the semantics. During the last three years, our group has participated in the R-GMA project, which has developed a novel type of Grid information system (Ma, Viglas, Li & Li 2005, Cooke, Gray, Ma, Nutt, Magowan, Oevers, Taylor,

Byrom, Field, Hicks, Leake, Soni, Wilson, Cordenonsi, Cornwall, Djaoui, Fisher, Podhorszki, Coghlan, Kenny & O’Callaghan 2003). Among the issues arising from this cooperation was one about devising an approach for integrating monitoring data that comes in a stream format. To date R-GMA is able to handle simple continuous selection queries within a view-based architecture. As on-going work, we are looking into ways to handle more complex queries, which require the automatic construction of distributed query plans, based on techniques for answering continuous queries using continuous views. A key operation in creating such plans is to determine whether a query is contained in another query. While this problem has been thoroughly investigated for queries over static databases, it is still open for continuous queries. From a purely theoretical perspective, perhaps the most interesting open question is that of de?ning extensions of relational operators to handle data stream constructs, and to further study the resulting ”stream algebra” and other properties of these extensions. As CSQL is able to describe how to transform streams into smaller result streams, it will be useful for queries in large distributed applications. Such a foundation is surely key to developing a general-purpose well-understood distributed query processor for distributed data streams.

References
Abadi, D. J., Carney, D., Cetintemel, U., Cherniack, M., Convey, C., Lee, ? S., Stonebraker, M., Tatbul, N. & Zdonik, S. B. (2003), ‘Aurora: a new model and architecture for data stream management.’, 12(2), 120–139. Arasu, A., Babcock, B., Babu, S., McAlister, J. & Widom, J. (2004), ‘Characterizing memory requirements for queries over continuous data streams.’, ACM Transactions on Database Systems 29(1), 162–194. Arasu, A. & Widom, J. (2004a), A denotational semantics for continuous queries over streams and relations., in ‘SIGMOD Record’, Vol. 33, pp. 6–11. Arasu, A. & Widom, J. (2004b), Resource sharing in continuous slidingwindow aggregates., in ‘VLDB’, Morgan Kaufmann, pp. 336–347. Babcock, B., Babu, S., Datar, M., Motwani, R. & Widom, J. (2002a), Models and issues in data stream systems., ACM Press, pp. 1–16. Babcock, B., Babu, S., Datar, M., Motwani, R. & Widom, J. (2002b), Models and issues in data stream systems., in ‘PODS’, ACM Press, pp. 1–16.

Babcock, B., Datar, M. & Motwani, R. (2004), Load shedding for aggregation queries over data streams., in ‘ICDE’, IEEE Computer Society, pp. 350–361. Carney, D., Cetintemel, U., Cherniack, M., Convey, C., Lee, S., Seidman, G., ? Stonebraker, M., Tatbul, N. & Zdonik, S. (2002), Monitoring streams - A new class of data management applications., in ‘VLDB’, Morgan Kaufmann, pp. 215–226. Chandrasekaran, S., Cooper, O., Deshpande, D., Franklin, M., Hellerstein, H., Hong, W., Krishnamurthy, S., Madden, S., Reiss, F. & Shah, M. (2003), TelegraphCQ: Continuous data?ow processing., in ‘SIGMOD Conference’, pp. 668–668. Chandrasekaran, S. & Franklin, M. (2002), Streaming queries over streaming data., in ‘VLDB’, pp. 203–214. Chen, J., DeWitt, D., Tian, F. & Wang, Y. (2000), NiagaraCQ: A scalable continuous query system for internet databases., in ‘SIGMOD Conference’, ACM, pp. 379–390. Cooke, A., Gray, A., Ma, L., Nutt, W., Magowan, J., Oevers, M., Taylor, P., Byrom, R., Field, L., Hicks, S., Leake, J., Soni, M., Wilson, A., Cordenonsi, R., Cornwall, L., Djaoui, A., Fisher, S., Podhorszki, N., Coghlan, B., Kenny, S. & O’Callaghan, D. (2003), R-GMA: An information integration system for grid monitoring, in ‘CoopIS/DOA/ODBASE’, pp. 462–481. Cranor, C., Johnson, T., Spataschek, O. & Shkapenyuk, V. (2003), Gigascope: A stream database for network applications., in ‘SIGMOD Conference’, ACM Press, pp. 647–651. Dobra, A., Garofalakis, M., Gehrke, J. & Rastogi, R. (2002a), Processing complex aggregate queries over data streams., in ‘SIGMOD Conference’, ACM, pp. 61–72. Dobra, A., Garofalakis, M., Gehrke, J. & Rastogi, R. (2002b), Processing complex aggregate queries over data streams., in ‘SIGMOD Conference’, ACM Press, pp. 61–72. Gehrke, J., Korn, F. & Srivastava, D. (2001), On computing correlated aggregates over continual data streams., in ‘SIGMOD Conference’, ACM, pp. 13–24.

Gilbert, A., Kotidis, Y., Muthukrishnan, S. & Strauss, M. (2001), Sur?ng wavelets on streams: One-pass summaries for approximate aggregate queries., in ‘VLDB’, Morgan Kaufmann, pp. 79–88. Hammad, M., Franklin, M., Aref & Elmagarmid, A. (2003), Scheduling for shared window joins over data streams., in ‘VLDB’, Morgan Kaufmann, pp. 297–308. Kie?ling, W. (2002), Foundations of preferences in database systems., in ‘VLDB’, Morgan Kaufmann, pp. 311–322. Li, J., Maier, D., K.Tufte, Papadimos, V. & Tucker, P. (2005), Semantics and evaluation techniques for window aggregates in data streams., in ‘SIGMOD Conference’, pp. 311–322. Ma, L., Viglas, S., Li, M. & Li, Q. (2005), Stream operators for querying data streams., in ‘WAIM Conference’, Vol. 3739, Springer, pp. 404–415. Manjhi, A., Nath, S. & Gibbons, P. (2005), Tributaries and Deltas: E?cient and robust aggregation in sensor network streams., in ‘SIGMOD Conference’, ACM, pp. 287–298. Manjhi, A., Nath, S. & Gibbons, P. (n.d.), Tributaries and Deltas: E?cient and robust aggregation in sensor network streams., in ‘SIGMOD Conference’, ACM Press, pp. 287–298. Ramakrishnan, R., Donjerkovic, D., Ranganathan, A., Beyer, K. & Krishnaprasad, M. (1998), SRQL: Sorted relational query language., in ‘SSDBM’, IEEE Computer Society, pp. 84–95. Seshadri, P., Livny, M. & Ramakrishnan, R. (1995), SEQ: A model for sequence databases., in ‘ICDE’, IEEE Computer Society, pp. 232–239. Terry, D., Goldberg, D., Nichols, D. & Oki, B. (1992), Continuous queries over append-only databases, in ‘SIGMOD Conference’, ACM, pp. 321– 330. Turner, R. & Lowden, B. T. (1985), An introduction to the formal speci?cation of relational query languages., in ‘Computer Journal’, Vol. 28, British Computer Society, pp. 162–169. Yao, Y. & Gehrke., J. (2003), Query processing in sensor networks, in ‘CIDR’, pp. 233–244.



更多相关文章:
更多相关标签:

All rights reserved Powered by 甜梦文库 9512.net

copyright ©right 2010-2021。
甜梦文库内容来自网络,如有侵犯请联系客服。zhit325@126.com|网站地图