Suspend extensions for Vert.x Database

I was recently working with Vert.x building a project (for fun) which involved storing and retrieving data from PostgreSQL. Vert.x is an absolutely amazing project and a wonderful way to bring the best of async programming to the JavaVM, but you can quickly end up with something approaching the “callback hell” of early Node.JS code. Using the Future class helps a lot, but since I was programming in Kotlin I decided to add my own thin database layer using Coroutines.

Coroutines (suspend functions in Kotlin) allow you to build amazingly complex state-machines transparently. In the simplest terms: they allow you to write code that appears to be “blocking”, but is actually a series of callbacks and state-machines. In Kotlin this is especially powerful since unlike most languages: the scheduling of the coroutines is decoupled from the language. This means that things like generator functions (function* in JavaScript) are implemented in the API rather than the compiler or runtime environment.

To make this simpler I used the excellent Vert.x / Kotlin module: https://github.com/vert-x3/vertx-lang-kotlin which has some simple “coroutine friendly” utilities for Vert.x. I also decided to tie the coroutine face to database transactions, since it’s normal that any database actions that rely on ordering are also executed within a transaction. At it’s simplest the class is simply a list of suspend functions that delegate to Vert.x’s “normal” SQLClient functions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class AsyncSQLConnection(val sqlConnection: SQLConnection) {
suspend inline fun <R> transaction(block: AsyncSQLConnection.() -> R): R {
setAutoCommit(false)
try {
val result = block()
commit()
return result
} catch (err: Throwable) {
rollback()
throw err
} finally {
setAutoCommit(true)
}
}

suspend fun setAutoCommit(b: Boolean) = awaitResult<Void> { sqlConnection.setAutoCommit(b, it) }
suspend fun execute(sql: String) = awaitResult<Void> { sqlConnection.execute(sql, it) }

suspend fun query(sql: String) = awaitResult<ResultSet> { sqlConnection.query(sql, it) }
suspend fun query(sql: String, params: JsonArray) = awaitResult<ResultSet> { sqlConnection.queryWithParams(sql, params, it) }

suspend fun querySingle(sql: String) = awaitResult<JsonArray?> { sqlConnection.querySingle(sql, it) }
suspend fun querySingle(sql: String, params: JsonArray) = awaitResult<JsonArray?> { sqlConnection.querySingleWithParams(sql, params, it) }

suspend fun update(sql: String) = awaitResult<UpdateResult> { sqlConnection.update(sql, it) }
suspend fun update(sql: String, params: JsonArray) = awaitResult<UpdateResult> { sqlConnection.updateWithParams(sql, params, it) }

suspend fun commit() = awaitResult<Void> { sqlConnection.commit(it) }
suspend fun rollback() = awaitResult<Void> { sqlConnection.rollback(it) }
}

This class is made simpler to use by adding an extension function to the Vert.x SQLClient:

1
2
3
4
5
6
7
8
inline suspend fun <R> SQLClient.transaction(block: (AsyncSQLConnection) -> R): R {
val connection = awaitResult<SQLConnection> { getConnection(it) }
try {
return AsyncSQLConnection(connection).transaction(block)
} finally {
connection.close()
}
}

Now to use it you can simply launch a coroutine context, and query in a similar way to classic JDBC where the results will be returned directly.

1
2
3
4
5
launch(vertx.dispatcher()) {
sqlClient.transaction {
val task = querySingle("SELECT * FROM tasks WHERE id = ?", JsonArray(listOf(id)))
}
}

The great part is that this code isn’t using runBlocking but also completely avoids callbacks, while the Kotlin compiler writes in fantastically clear stack-traces (allowing for much easier debugging). You can further enhance the class by adding vararg parameter sets to the functions, as done here:

1
2
3
suspend fun query(sql: String, vararg params: Any?) = query(sql, JsonArray(listOf(*params)))
suspend fun querySingle(sql: String, vararg params: Any?) = querySingle(sql, JsonArray(listOf(*params)))
suspend fun update(sql: String, vararg params: Any?) = update(sql, JsonArray(listOf(*params)))