1 2 Previous Next 18 Replies Latest reply on Apr 3, 2012 1:13 PM by markaddleman

    Windowing & Continuous Executions

    markaddleman

      I'd like to propose a mechanism to window continuous executions to allow for a stream of aggregates without introducing new SQL keywords.  For example, a common use case is to get the count of events over a stream of 10 minute time blocks.

       

      Suppose Teiid introduced a windowClosed() method on ExecutionContext. Any translator participating in the plan could call windowClosed() to indicate that the engine should produce a result over the accumualted results as if the execution were closed but, windowClosed would still allow the execution to continue.

       

      I imagine that I could write the following query:

      SELECT window.timestamp, count(t.event)

      FROM (call window.produce_timestamps(60, 'SECONDS')) as window, t

      The implementation of "produce_time_blocks" stored procedure is an implementation of ProcedureExecution and ReusableExecution which would operate asynchronously.  Every 60 seconds, it would

      1. Call dataAvailable()
      2. Return a single column result set of the current timestamp
      3. Call windowClosed()

      The engine would then have enough information to produce a result set from the query.  It would then reset all ReusableExecutions and continue processing the plan.

       

      I believe this approach would allow for GROUP BY and ORDER BY operations within execution windows, as well.

       

      Thoughts?

        • 1. Re: Windowing & Continuous Executions
          shawkins

          GROUP BY and ORDER BY are both allowed against continuous exectuions already.  They just operate on the results returned for each execution set.  Each translator participating in a continuous exection already signals when it is done with an execution set by returning null.  The results are processed together and then the plan is reinitiated and the process is repeated. 

           

          If I understand it correctly, your proposal is to add a callback method to indicate that the window is closed, but what about all of the other sources that are participating?  Should we wait until null is returned (in which case a new mechanism is not really needed) or that the other executions are effectively cancelled?

           

          The SQL example would need a minor tweak using a subquery or a group by window.timestamp, but yes SQL of that form would produce a heatbeat for re-execution where the stored procedure waits 60 seconds to return the timestamp/ending null results.   You could do that rather than relying the default immediate re-execution or using the source hint mechanism to coordinate a time window.

           

          Steve

          • 2. Re: Windowing & Continuous Executions
            markaddleman

            I didn't know there was a source hint mechanism to coordinate the time window.  I suspect that will do exactly what I want (and, as I recall now, what we talked about before). I didn't see it in Confluence, can you point me to some doc or test case for the source hint?

            • 3. Re: Windowing & Continuous Executions
              shawkins

              Source hints are touched on in the Developer and Reference guides.  See https://docs.jboss.org/author/display/TEIID/Query+Plans and https://docs.jboss.org/author/display/TEIID/Executing+Commands

              • 4. Re: Windowing & Continuous Executions
                markaddleman

                I may have misread your comment:  Is there an existing source hint to tell the query engine to control the time window?  Or, are you suggesting that each translator interpret a common source hint?

                • 5. Re: Windowing & Continuous Executions
                  shawkins

                  There is no built-in notion of time windows, that would be up to your translators.

                  • 6. Re: Windowing & Continuous Executions
                    markaddleman

                    Gotcha. 

                     

                    Let me run this approach by you and see what you think:

                    • A delegating translator, the TimeWindowingExecutionFactory, understands a source hint that defines the time window
                    • TimeWindowingExecutionFactory creates a delegate execution context to intercept dataAvailable() notifications from the delegate
                    • When the time window expires, the TWEF:
                      • Calls dataAvailable() and returns null on the subsequent next()
                      • Delays any delegate translator calls to dataAvailable() until the null is processed

                     

                    Some care should be taken to ensure that the delegate doesn't call dataAvailable() between the TWEF's call to dataAvailable() and returning null from next().  Also, all TWEF instances participating in the same plan should coordinate with each other so they return null at the same time.  I think we can use ExecutionContext.getRequestIdentifier() for this?

                    • 7. Re: Windowing & Continuous Executions
                      shawkins

                      A full description of the current behavior, which does punt on explicit coordination, may help.  Query begins processing as normal.  Once all participating sources have reported an end of results, the current execution set finishes processing.  dataAvailable may be called by any source at anytime.  If the plan has been blocked due to datanotavailable or some other blocking mechanism the plan processing will continue.  Regardless of what source called dataAvailable, If data was already retrieved from a source it will not be asked for more results until processing is reinitiated, which happens immediately after the current execution set finishes.

                       

                      The issues here are:

                      -How/whether to coordinate the end or results?

                       

                      What you are proposing sounds like you want to be able to cut off results at some predefined time interval.  It's important to consider that given our threading model and of course the JVM you don't have precise control over when exactly processing will begin on a connector request.  My guess at a potential implementation path was to use a time window (which could be specified in the source hint) something like return results over 1 minute intervals starting at a given time.  It probably makes more sense to give the start processing time as something that is available via the CommandContext, rather than relying on a hint of some other request id based lookup to determine a start time.  Each source would then return null based upon the expiration of the current window.  From there having counters, where were incremented in the reset method, in your ReusableExecutions would allow you to keep track of which window was to be returned.  Of course a potential issue there is what to do if post / client processing causes processing to be reinitiated after a given window has expired.

                       

                      -How to control when query processing begins again?

                       

                      Here again I was imaging that processing was essentially descrete intervals of a continum.  So it was appropriate to being processing immediately after the current execution set finished.  Sources could of course report datanotavailable until they were ready to return results.  Do you envision wanting coordination of when processing begins again?

                       

                      Steve

                      • 8. Re: Windowing & Continuous Executions
                        markaddleman

                        I'll answer the easy questions first

                         

                        Do you envision wanting coordination of when processing begins again? 

                        No, starting the new execution immediately after the old one is exactly what I want.

                         

                        What you are proposing sounds like you want to be able to cut off results at some predefined time interval.

                        Yes, but this is a soft real-time requirement.  I have an async data source that produces events and I want to know how many events have occured within a time interval. 

                         

                        It's important to consider that given our threading model and of course the JVM you don't have precise control over when exactly processing will begin on a connector request.

                        I think that's fine.  We can require that these async data sources buffer their values so no event is lost.  Also, because this is soft real-time, I don't care if I define 1 minute intervals and, in practice, some intervals are 60100ms and others are 59950ms.  Further, our application can tolerate one data source starting (or even ending) many milliseconds after another one within the same query.  To the extent that we care about any slippage, we simply care to report it to the client and we've found that SQLWarnings (thanks for the suggestion!) are a good solution for this kind of side-band information.

                         

                        Do I understand your point about "when exactly processing will begin on a connector request?"

                         

                        It probably makes more sense to give the start processing time as something that is available via the CommandContext, rather than relying on a hint of some other request id based lookup to determine a start time.  Each source would then return null based upon the expiration of the current window.  From there having counters, where were incremented in the reset method, in your ReusableExecutions would allow you to keep track of which window was to be returned.

                        As I was thinking about different implementation approaches, I came on the counter idea, too.  I like it.  In a use case where we have a stream of data values and timestamps coming from a data source that I want to aggregate over a time window, I imagined being able to execute a query:

                        /* sh: every(60, SECONDS) */
                        SELECT time_window.start, time_window.end, data.count(*)
                        FROM (call clock.time_window()) AS time_window, data
                        WHERE timestamp BETWEEN time_window.start AND time_window.end
                        GROUP time_window.start, time_window.end

                         

                        clock.time_window() would be a stored procedure that produced a reusable execution of start & end times aligned to the source hint's time window.  It would be a bit easier to write the stored procedure if the execution context contained a counter, but I consider that gravy.  I suppose if there were multiple reusable executions within a given query, it would be useful to have a command-wide counter.

                         

                        A slightly bigger issue is the issue of time alignment.  Ultimately, I'd like clock.time_window() to return times that align across queries.  That might mean the first time window is intentionally shortened  make the rest line up.  For example, if I have two different queries producing results every minute, I'd like the time stamps of each query to be aligned.  On second thought, if the engine is in charge of time windows, perhaps it is better to require the client to start the query on a time-aligned boundary.  That would be acceptable to us. 

                         

                        Of course a potential issue there is what to do if post / client processing causes processing to be reinitiated after a given window has expired.

                        In our application, the client is reading off the Teiid thread and immediately posting results to a queue, so this is a very unlikely condition but should be addressed.  I would say that the engine could monitor any time-window "slippage" due to client processing and report it as a SQLWarning along with the result set.  The client could then decide how to handle the slip.  I guess if the slip is too great, the engine could decide to abandon that time window and report it as an empty result set with an appropritae SQLWarning to the client.  Again, for our application, that's unnecessary.

                        • 9. Re: Windowing & Continuous Executions
                          markaddleman

                          RE:

                          /* sh: every(60, SECONDS) */

                          SELECT time_window.start, time_window.end, data.count(*)

                          FROM (call clock.time_window()) AS time_window, data

                          WHERE timestamp BETWEEN time_window.start AND time_window.end

                          GROUP time_window.start, time_window.end

                           

                          I see now that this query doesn't make as much sense as I originally thought.  Let me think more about this case...

                          • 10. Re: Windowing & Continuous Executions
                            shawkins

                            Mark,

                             

                            I think we are getting closer.  On "slipage", unless the engine has an explicit notion of a time window, the engine won't be aware that targeted time windows are being missed.  However based upon your description that the executions will be queuing events, there isn't really a worry about slipage as you would simply expect all time windows to be processed in order and could drop data or error out if lagging.

                             

                            So the things that need more definition are:

                            -window initial start time - this could be the query start processing time which should be added to the CommandContext

                            -length of window interval - more than likely conveyed via the source hint, but along with the start time could be part of the asynch RequestOptions

                            -interval coordination - we can easily add an execution counter to the CommandContext, then as long as all participating executions have the same interval the results should be correlated

                            -time values in the results - it seems like this may be simpler, such as:

                             

                            /* sh: every(60, SECONDS) */ SELECT count(*), max(data.tstamp) as end, min(data.tstamp) as start FROM data

                             

                            This is assuming data is fetched via a ReusableExecution that can interpret the source hint to produce events over a 60 second interval and you provide an even timestamp as column data.  The client would then approximately every minute receive the aggregate result of the count and the start/end timestamps.

                             

                            Steve

                            • 11. Re: Windowing & Continuous Executions
                              markaddleman

                              -window initial start time - this could be the query start processing time which should be added to the CommandContext

                              No problem there.

                               

                              -length of window interval - more than likely conveyed via the source hint, but along with the start time could be part of the asynch RequestOptions

                              I don't really care how its specified but, conceptually, it seems to belong to the query and a source hint seems like the best way to accomplish that.

                               

                              -interval coordination - we can easily add an execution counter to the CommandContext, then as long as all participating executions have the same interval the results should be correlated

                              Agreed, the counter is the way to go here.  Questions below

                               

                              -time values in the results - it seems like this may be simpler, such as:

                              /* sh: every(60, SECONDS) */ SELECT count(*), max(data.tstamp) as end, min(data.tstamp) as start FROM data

                              Wouldn't that would give me the timestamps of the first and last datapoints in the interval?  I also want the boundary times of the interval, hence the desire to join with an execution which produces the interval boundaries.

                               

                              I want to make sure I grok what you're thinking: 

                              • The CommandContext will contain information about the time windows:  start time and interval counter. 
                              • The engine would increase the interval counter on the start of a new query interval but the engine will not perform any processing directly related to time windows. 
                              • As far as the engine is concerned, the interval ends when all executions return null (just like it does today). 
                              • The executions must individually obey the time interval and return null at the boundaries.

                               

                              Is this right? 

                              • 12. Re: Windowing & Continuous Executions
                                shawkins

                                Yes the example query would give you the begin/end event times and not the begin/end interval times.  You are also correct that a procedure could give the interval times, or you could add extra columns to your reusable/continuous tables to project out the interval start/end, or we can add accessor methods for the query start and counter so that you can just use scalar functions - query_start_time(), execution_count() - and you can derive the boundary times in the client.

                                 

                                • The CommandContext will contain information about the time windows:  start time and interval counter. 

                                Yes, where the start time will be the same value that is already captured for admin api Request objects.   

                                 

                                • The engine would increase the interval counter on the start of a new query interval but the engine will not perform any processing directly related to time windows. 

                                Yes, that makes the most sense for me to start with.  Otherwise we may need to start looking at a lot more temporal methods / windowing facilities and time guarentees.

                                 

                                • As far as the engine is concerned, the interval ends when all executions return null (just like it does today). 
                                • The executions must individually obey the time interval and return null at the boundaries.

                                Yes since we already have an end of results marker, there's no need to introduce another one.  From the eariler posts I wasn't clear though if you would rather design your executions in a way that was interruptable - such that someone called an end event which would effectively stop results and cancel further execution.  Given that we don't have guarentees over when connector work items are processed it seems better to avoid that kind of coordiation mechanism.  However I do realize that requiring each source to obey a time interval puts a burdon on the source hint or possibly a RequestOption for specifying the interval, rather than using a procedure call to produce a heartbeat.  So this should be explored more as needed.

                                • 13. Re: Windowing & Continuous Executions
                                  markaddleman

                                  From the eariler posts I wasn't clear though if you would rather design your executions in a way that was interruptable - such that someone called an end event which would effectively stop results and cancel further execution.  Given that we don't have guarentees over when connector work items are processed it seems better to avoid that kind of coordiation mechanism. 

                                   

                                  I like the small step that you've proposed.  At this stage, it seems that we can write a TimeWindowDelegatingExecutionFactory (imagined above) which might introduce null in the middle of an execution's result set and buffer the remaining result set until the next time window.  Or, the TWDEF could delay the end of a time window until the delegate's execution is complete.  Or, like you say, we could dump the remaining result set on the floor.  I'm not sure what the better solution is at this point and it will probably end up being data source dependent (perhaps controlled by translator property). 

                                   

                                  However I do realize that requiring each source to obey a time interval puts a burdon on the source hint or possibly a RequestOption for specifying the interval, rather than using a procedure call to produce a heartbeat.  So this should be explored more as needed.

                                   

                                  Agreed.  I'll report back with our experiences and we can decide if any improvements are necessary.

                                   

                                  or we can add accessor methods for the query start and counter so that you can just use scalar functions - query_start_time(), execution_count() - and you can derive the boundary times in the client.

                                  I have a small preference for this approach but, ultimatey, it doesn't matter much.

                                  • 14. Re: Windowing & Continuous Executions
                                    markaddleman

                                    One more thing:  As I indicated above, we have a desire to coordinate time windows across queries.  After giving this some thought, I can't think of a sane way for the engine to control the start of a query (how would it know what valid time boundaries are?) so I think it ought to be controlled by the TWDEF:  It could coordinate with other queries through singleton clock and delay any execution until the appropriate time boundary. 

                                     

                                    This approach introduces a new notion of when the query start time is, though.  Perhaps it is best to leave query start time out of the engine entirely since it seems we're going to need a clock data source?

                                    1 2 Previous Next