7 Replies Latest reply on Apr 30, 2015 6:58 AM by sannegrinovero

    Recording commit events into a a distributed binary log for consumption by an eventually consistent external client

    sannegrinovero

      Hello all,

      I could use some design advice related to feeding a binary log of changes generated from an Hibernate/JPA application, typically deployed on WildFly (i.e. using Narayana).

       

      We register a javax.transaction.Synchronization as soon as any write is being detected via Hibernate event listeners, and collect changes in the Synchronization instance. We then want to record some work to be done eventually in a binary log, by listening to successful after-completion events.

       

      The problem is this binary log needs to be reliable, distributed, and respect ordering of these logged events. I haven't decided which log implementation I could use for that - currently evaluating Apache Kafka, or a RAFT implementation such as JGroups-Raft - but my concern is with the definition of "ordering".

       

      What we need is that the consumer which will eventually process this log, should be able to apply operations which relate to the same "database entity" (same record) in the same order as transactions have been applied on the database; for example I should in theory be able to replicate the database state. It's currently not my intention to actually provide a database replication feature, but that's possibly an interesting by-product as it would pave the road for a JavaEE friendly alternative for facilities such as XStreams and GoldenGate (both Oracle proprietary).

       

      If I were to use a replicated state machine, these would accept the log entries in the order in which they are enqueued - which doesn't necessarily match the order of what the transaction did (am I right?), especially not as I'm triggering the state transitions as a post-commit event so that opens up for some race conditions between the locks being released on the database and the transitions to be propagated across multiple nodes in a cluster.

       

      By reading the documentation of Apache Kafka, this solution seems more promising than a replicated state machine in terms of performance, but it seems to only guarantee ordering from each event producer independently, so I'm afraid that's not good enough in case of clustered applications deployed on multiple AS instances. And I'd have the same ordering problem of the race conditions described in the previous paragraph, so I need additional hints about this "order" concept.

       

      Assuming one could patch/configure the jgroups-raft implementation or the Kafka implementation to use a different definition of "order" - or worst case implement my own binary log - would Narayana be able to help with such a problem?

       

      For example I think I remember in certain configurations the Transaction ID can be used to help identify some form of "global order" even across multiple nodes in the cluster - however it would not be correct to base ordering on an identifier which gets generated at the beginning of a transaction, so I'm wondering if there is a similar concept which could be used to track ordering of commit operations. I'm aware I could probably get quite far by using timestamps, but I'd prefer a more reliable solution.

      Or should I rather look into Hibernate ORM's Flush and Lock events? Are relational databases going to generate some meaningful metadata? (I'm willing to patch Hibernate ORM to get these, if they exist)

       

      Another thing I'm wondering, is if the Transaction log from Narayana could be used (abused) to store such a binary application log. It would need to be able to store additional payload and ultimately make it possible to have a remote consumer read the entries generated by all nodes of the application in global order.

       

      Thanks in advance for any pointer or suggestions!

      Sanne

        • 1. Re: Recording commit events into a a distributed binary log for consumption by an eventually consistent external client
          tomjenkinson

          Hi Sanne,

           

          One initial observation, is it important to you that Synchronization::afterCompletion(int Status) may not be called in the case of failure? If so you are likely going to need to register your own XAResource with the transaction as that is the only way to be guaranteed a callback.

           

          We are still reviewing your message...

           

          Thanks,

          Tom

          • 2. Re: Recording commit events into a a distributed binary log for consumption by an eventually consistent external client
            tomjenkinson

            Also, you can add your own record type to Narayana to record data instead of an XAR. Commonly this is done by extending an narayana/AbstractRecord.java at master · jbosstm/narayana · GitHub

            There are some documentation over here: Narayana Project Documentation

            • 3. Re: Recording commit events into a a distributed binary log for consumption by an eventually consistent external client
              tomjenkinson

              A further issue you might come across is that the way the update is applies is (possibly) not stricly guaranteed by the resource manager (at least not that I am aware).

               

              Consider a TX with multiple XAR registered. It is possible that the RM can go down after prepare with multiple TX active. That means that the recovery manager will take over. The failure could be transient and further updates in a TX then do apply successfully. By that way update 2 (for example) may be committed before the recovery manager is able to sucessfully commit update 1.

               

              Would that be an issue?

              • 4. Re: Recording commit events into a a distributed binary log for consumption by an eventually consistent external client
                sannegrinovero

                One initial observation, is it important to you that Synchronization::afterCompletion(int Status) may not be called in the case of failure? If so you are likely going to need to register your own XAResource with the transaction as that is the only way to be guaranteed a callback.

                No, that's ok. We only need to listen to successful transactions, and don't have resources which could need cleanup depending on this, so it's ok if some synchronizations are simply lost without a callback.

                 

                Also, you can add your own record type to Narayana to record data instead of an XAR.

                That's very interesting. I guess it would require some cross-teams cooperation to get such an extension hooked up inside WildFly, depending which capabilities get activated for a specific user deployment.. I'll keep it in mind but I'd need to solve the ordering puzzler first for any of this to be of use.

                 

                A further issue you might come across is that the way the update is applies is (possibly) not stricly guaranteed by the resource manager (at least not that I am aware).

                 

                Consider a TX with multiple XAR registered. It is possible that the RM can go down after prepare with multiple TX active. That means that the recovery manager will take over. The failure could be transient and further updates in a TX then do apply successfully. By that way update 2 (for example) may be committed before the recovery manager is able to sucessfully commit update 1.

                 

                Would that be an issue?

                Our goal is to stay in synch with the relational database, so I guess we're open to such issues but then that's business as usual, and people need to rely on the recovery manager exactly to restore correctness to the world.. right? So I guess it's ok to accept that we might be out of synch as long as partial failures need to be fixed. What we would need is to make sure that when the changes are finally applied, that these are also sent to our log (if they haven't already).

                Consider that what I'm exploring is a performance optimisation for the happy path: I'm assuming we could detect that such a recovery was necessary, then trigger our less efficient alternative to repair from exceptional cases in which there's ambiguity about ordering. Thanks a lot for highlighting this case though! I definitely need to keep that in mind.

                 

                 

                My first and foremost question is about the ordering of events though. I don't need Total Order in this binary log but at least changesets which share some common entries should not be reordered. Hibernate could do some magic but it's constrained to a single JVM (and would probably be very complex to do correctly), while the database should be able to see the full picture as it has to resolve such conflicts via some commitment ordering.

                Looks like I'd need to access the database log, wondering if there's any helpful metadata accessible to our code. I'm guessing there isn't and I should look at an alternative approach?

                 

                Would the transaction ids represent a usable ordering criteria in case the isolation level was set to Serializable?

                • 5. Re: Recording commit events into a a distributed binary log for consumption by an eventually consistent external client
                  epbernard

                  I don't think you can guarantee what you are looking for in the general sense unless you attach some kind of totally ordered id to each transaction. And even that might not even make sense if the first database is accepting weaker concurrency.

                   

                  One possible thing you can do is rely on the version field of an entity ensure that you won't apply a message if a message with a more recent version has already bee applied. This is a specific case but would work for you I think:

                  - partition work per entity type

                  - pass the version number in the message

                  - store the version in the index or/and in memory by the unique indexer (your message log consumer)

                  - ignore messages when message.entity.version < applied.entity.version

                   

                  This works for entities with optimistic locking and because when indexing you rely on the full data and not a change diff. That way an index is always up to date.

                  • 6. Re: Recording commit events into a a distributed binary log for consumption by an eventually consistent external client
                    mmusgrov

                    Did you consider Tango (http://sigops.org/sosp/sosp13/papers/p325-balakrishnan.pdf). It provides "Distributed Data Structures over a Shared Log" and has properties such as linearizability, persistence and high availability from a shared log.

                     

                    Linearizability gives you your ordering requirement.

                     

                    The default implementation of narayana transaction ids encodes the creation time (but I suspect that you are more interested in the commit time). If you can guarantee that all servers are using the same time and creation time is good enough then I think using the id to order transactions will work.

                    • 7. Re: Recording commit events into a a distributed binary log for consumption by an eventually consistent external client
                      sannegrinovero

                      Hi Mike,

                      thanks for the interesting paper!

                       

                      But you're right, using the creation time isn't suited. I actually would prefer to avoid using timestamps at all as it pushes a lot more complexity on the infrastructure like clock synchronizations hell.

                      The commit timestamp could be a solution as long as it was delivered by the database, and the RDBMS was configured for serializable transactions, but I'm understanding from other answers (and a chat with Jonathan) that such details just aren't available.

                       

                      Emmanuel: yes that's probably a wiser direction, although it would require versioned entities.

                      I'm thinking that we only need this capability to "Sort" messages when there is a possibility that they might have been generated in a very close sequence of transactions which modify the same entities. But we can detect if there is a risk of ambiguity by looking ahead in the log:

                      We should be able to process chunks of the log stream and inspect if there's any possibility for potentially problematic event reorderings, and only if there is in fact some critical entity in this case we could resort to a new read from the RDBMS for this to act as final decision maker about what the correct state shall be.

                      This implies that sometimes we'll have to re-read the same entity graph again from the database, but I suspect that in practice that would be a small minority of all events being processed, so to provide the correctness I'm looking for but without a large overhead as we would have as by re-reading every single entity graph in the indexing backend.

                       

                      With such a design using the entity versions would be a further optimisation, as people who would have a problem with excess re-reads because of some specific data access pattern they would have the option to enable versioning to improve indexing performance.. this would make the use of versioning an optional benefit, which I like more than it being a requirement for correctness.

                       

                      Thanks all for your thoughts!