-
Notifications
You must be signed in to change notification settings - Fork 333
Feature/consistent cassandra schema migrations #467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/consistent cassandra schema migrations #467
Conversation
not so sure anymore that querying with ALL will work
f5c062c
to
9ba251e
Compare
Improve robustness of casssandra schema migrations in 3 ways
Example error for 1.:
|
1. wait for system schema change propagation across the cluster before running the next statement adding/altering tables to avoid errors like the trace shown below 2. increase timeouts to cassandra to reduce likelihood of migration statement or meta inserts to fail 3. resolve DNS in case host is configured as a DNS name, not IP Example error for 1.: WARN [MessagingService-Incoming-/10.233.99.138] 2018-08-30 15:28:28,310 IncomingTcpConnection.java:103 - UnknownColumnFamilyException reading from socket; closing org.apache.cassandra.db.UnknownColumnFamilyException: Couldn't find table for cfId 57f68f40-ac69-11e8-802e-23be9e1e0824. If a table was just created, this is likely due to the schema not being fully propagated. Please wait for schema agreement on table creation. at org.apache.cassandra.config.CFMetaData$Serializer.deserialize(CFMetaData.java:1517) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.db.ReadCommand$Serializer.deserialize(ReadCommand.java:744) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.db.ReadCommand$Serializer.deserialize(ReadCommand.java:683) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.io.ForwardingVersionedSerializer.deserialize(ForwardingVersionedSerializer.java:50) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.net.MessageIn.read(MessageIn.java:123) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.net.IncomingTcpConnection.receiveMessage(IncomingTcpConnection.java:192) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.net.IncomingTcpConnection.receiveMessages(IncomingTcpConnection.java:180) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.net.IncomingTcpConnection.run(IncomingTcpConnection.java:94) ~[apache-cassandra-3.11.2.jar:3.11.2] INFO [MigrationStage:1] 2018-08-30 15:28:28,413 ColumnFamilyStore.java:411 - Initializing gundeck.meta INFO [MigrationStage:1] 2018-08-30 15:28:29,400 ColumnFamilyStore.java:411 - Initializing gundeck.user_push ERROR [MigrationStage:1] 2018-08-30 15:28:30,107 CassandraDaemon.java:228 - Exception in thread Thread[MigrationStage:1,5,main] java.lang.AssertionError: Table gundeck.push did not have any partition key columns in the schema tables at org.apache.cassandra.schema.SchemaKeyspace.fetchTable(SchemaKeyspace.java:1042) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.schema.SchemaKeyspace.fetchTables(SchemaKeyspace.java:998) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.schema.SchemaKeyspace.fetchKeyspace(SchemaKeyspace.java:957) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.schema.SchemaKeyspace.fetchKeyspacesOnly(SchemaKeyspace.java:949) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.schema.SchemaKeyspace.mergeSchema(SchemaKeyspace.java:1387) ~[apache-cassandra-3.11.2.jar:3.11.2] at org.apache.cassandra.schema.SchemaKeyspace.mergeSchemaAndAnnounceVersion(SchemaKeyspace.java:1366) ~[apache-cassandra-3.11.2.jar:3.11.2]
9ba251e
to
5fbf012
Compare
peers <- systemPeerVersions | ||
case local of | ||
Just localVersion -> return $ (localVersion, peers) | ||
Nothing -> liftIO $ throwIO $ ErrorCall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why you're not simply using error
here? If yes, I think it would be good to document it, because I spent two minutes thinking about it and haven't thought of anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing that comes to mind is defensive programming (e.g. what if retry doesn't work with error
?) and I think it's a valid approach, but then the comment should say -- Defensive programming (...)
🙂
Btw I think retry should work with error
but that's not exactly obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fisx told me not to use error
here, maybe he can elaborate his reasons? recovering
works indeed with error
and any throwIO
, yes, but retrying
(in retryWhileN
) doesn't catch exceptions. We're not expecting any exceptions here though - I don't mean to catch any exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my point was that error === unsafePerformIO . Control.Exception.throwIO . ErrorCall
, and the unsafePerformIO
complicates the semantics of the code. so as long as we're in MonadIO m
anyway, it is easier to reason about what the code does if we use the latter, without the unsafePerformIO
. it actually changes the semantics slightly, because the error is now thrown in the execution of the action, not in the construction. but this is closer to at least my own intuition when i read this function.
does this make sense? perhaps we should have
errorIO :: (Show msg, MonadIO m) => msg -> m a
errorIO = liftIO . Control.Exception.throwIO . ErrorCall
somewhere in the libraries?
cql :: PrepQuery R () (Identity UUID) | ||
cql = "select schema_version from system.peers" | ||
|
||
systemLocalVersion :: Client (Maybe UUID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you switch the order of these definitions to be the same as the one that's used when calling them? (local, peer)
@@ -182,6 +203,49 @@ migrateSchema l o ms = do | |||
metaInsert :: QueryString W (Int32, Text, UTCTime) () | |||
metaInsert = "insert into meta (id, version, descr, date) values (1,?,?,?)" | |||
|
|||
-- | Retrieve and compare local and peer system schema versions. | |||
-- if they don't match, retry once per second for 30 seconds | |||
waitForSchemaConsistency :: Client () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's rather clean and well-commented, thanks
You can have `evaluate (error "foo")` which would have the same semantics
as your solution. However, admittedly I've never seen it used anywhere. (Or
your errorIO.)
I feel rather safe using error instead of errorIO, but I don't mind if
somebody uses errorIO. The only reason I'm asking for a comment is that
this errorIO construction is unexpected enough – and the concept of
exceptions in Haskell is tricky enough – that in the absense of a comment
it's rather natural to assume that something very subtle is going on.
…On Wed, Sep 5, 2018, 12:47 fisx ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In libs/cassandra-util/src/Cassandra/Schema.hs
<#467 (comment)>:
> +-- | Retrieve and compare local and peer system schema versions.
+-- if they don't match, retry once per second for 30 seconds
+waitForSchemaConsistency :: Client ()
+waitForSchemaConsistency = do
+ void $ retryWhileN 30 inDisagreement getSystemVersions
+ where
+ getSystemVersions :: Client (UUID, [UUID])
+ getSystemVersions = do
+ -- These two sub-queries must be made to the same node.
+ -- (comparing local from node A and peers from node B wouldn't be correct)
+ -- using the custom 'migrationPolicy' when connecting to cassandra ensures this.
+ local <- systemLocalVersion
+ peers <- systemPeerVersions
+ case local of
+ Just localVersion -> return $ (localVersion, peers)
+ Nothing -> liftIO $ throwIO $ ErrorCall
my point was that error === unsafePerformIO . Control.Exception.throwIO .
ErrorCall, and the unsafePerformIO complicates the semantics of the code.
so as long as we're in MonadIO m anyway, it is easier to reason about
what the code does if we use the latter, without the unsafePerformIO. it
actually changes the semantics slightly, because the error is now thrown in
the execution of the action, not in the construction. but this is closer to
at least my own intuition when i read this function.
does this make sense? perhaps we should have
errorIO :: (Show msg, MonadIO m) => msg -> m a
errorIO = liftIO . Control.Exception.throwIO . ErrorCall
somewhere in the libraries?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#467 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABc-ahX7E-2Xfm2j3FlHyp2exaqS4zCcks5uX6vLgaJpZM4WXxc6>
.
|
Okay, I switched back to using 'error' in this one particular usage, consistent with the rest of the cassandra-util module. I'm happy to go down either the 'error' or 'errorIO' path, as long as A) it's used consistently across the module (it wasn't, which caused confusion), and B) we do this in a separate PR that explains the reasoning for the change. |
retryWhileN n f m = retrying (constantDelay 1000000 <> limitRetries n) | ||
(const (return . f)) | ||
(const m) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder to self: move retryWhileN
to a common module as it's (re)defined in a few places (e.g.,
wire-server/services/galley/test/integration/API/Util.hs
Lines 756 to 760 in 9914375
-- TODO: Refactor, as used also in other services | |
retryWhileN :: (MonadIO m) => Int -> (a -> Bool) -> m a -> m a | |
retryWhileN n f m = retrying (constantDelay 1000000 <> limitRetries n) | |
(const (return . f)) | |
(const m) |
Nothing -> error "No system_version in system.local (should never happen)" | ||
|
||
inDisagreement :: (UUID, [UUID]) -> Bool | ||
inDisagreement (localVersion, peers) = not $ all (== localVersion) peers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could also be written as:
inDisagreement (localVersion, peers) = any (/= localVersion) peers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one! 👌
I don't know if this actually works as intended in the context of multiple nodes, to be tested. (making the PR is a way of creating the images to test this)Rebased + squashed, see updated code/comment below.