Different programming languages offer various concurrency constructs, each bringing a unique approach to solving
concurrency challenges. One widely used construct is the
Future
. A Future
represents an asynchronous computation
that is triggered immediately upon creation. Once the computation completes, the Future is populated with the result.
While Future is useful for many scenarios, it does not allow deferring the actual execution of the computation until a
specific condition is met. What if we want to start the computation only when a timer expires or when a condition
becomes true? This is where a Promise
comes in.
Unlike Future, which triggers execution immediately, a Promise allows us to manually complete a Future at a later time. This is useful in scenarios where execution must be delayed until a condition is met.
To illustrate the use of a Promise, I’ll write a database batch write system. Instead of inserting every row individually, we will:
- Buffer insert requests in a queue.
- Trigger the actual write operation only when more than 3 records accumulate.
These batching constructs are very common in real-world application (think Kafka producer). They reduce data-store load, use bandwidth more effectively, and improve efficiency. The idea is to build a non-trivial program that uses this programming language construct.
To build such a system, we first define an insert API. I’ll use Scala 2 with Postgres JDBC driver and no other dependencies for this illustration. Hopefully it will not be too hard to translate the ideas in another programming lanugage.
Our application will take a PSQL insert statement, and a sequence of (row) values to use with the insert statement.
Let’s start with building out what that insert
API will look like:
object BufferedWriter extends App {
def insert(sql: String, rows: Seq[Seq[Any]]): Unit = {
}
}
The next thing we would like to do is to buffer the insert query and row-values-to-be-inserted tuple. So let’s do that
using a ListBuffer
. We want an
unbounded queue at this point.
import scala.collection.mutable.ListBuffer
object BufferedWriter extends App {
val writePendingQueue = new ListBuffer[(String, Seq[Seq[Any]])]()
...
}
This queue stores (SQL Statement, Row Data) tuples. Now, let’s define logic to process the queue.
But first, let’s take a short detour and build out our PG connection establishement and schema creation. We’ll
create a users
table with a couple of fields. In this example, the users table stores “balance” information:
~ % psql -h localhost -U postgres
psql (17.2, server 16.4 (Debian 16.4-1.pgdg120+2))
Type "help" for help.
postgres=# CREATE TABLE public.users (
postgres(# id SERIAL PRIMARY KEY,
postgres(# name VARCHAR(255) NOT NULL,
postgres(# balance NUMERIC(10,2) NOT NULL,
postgres(# created_date DATE NOT NULL
postgres(# );
CREATE TABLE
Next, let’s add code to talk to PG:
import java.sql.{Connection, DriverManager, SQLException}
import java.time.LocalDateTime
...
object BufferedWriter extends App {
...
private def getConnection: Connection = {
Class.forName("org.postgresql.Driver")
try {
DriverManager.getConnection("jdbc:postgresql://localhost:5432/postgres", "postgres", "postgres")
} catch {
case ex: SQLException =>
println(s"Failed to obtain connection: ${ex.getMessage}")
throw ex
}
}
private def runUseCase(): Unit = {
val insertSql = "INSERT INTO users (id, name, balance, created_date) VALUES (?, ?, ?, ?)"
val rowsT0 = Seq(
Seq(4, "Jai", 4000.0, LocalDateTime.now()),
Seq(5, "Veeru", 5000.0, LocalDateTime.now()),
Seq(6, "Samba", 6000.0, LocalDateTime.now()),
)
val rowsT1 = Seq(
Seq(1, "Ram", 1000.0, LocalDateTime.now()),
Seq(2, "Shyam", 2000.0, LocalDateTime.now()),
Seq(3, "Krishna", 3000.0, LocalDateTime.now()),
)
insert(insertSql, rowsT0)
insert(insertSql, rowsT1)
}
runUseCase()
}
Note, I’m conveniently assuming that you have access to a Postgres instance and that has a database called postgres
that can be accessed with the the username and password postgres
:postgres
. I know, very safe 😁
So far, so good. Now comes the key question. When inserting to the Postgres users
table I am looking for the key
feature that insert ought to happen only when more than 3 records have accumulated in the write-pending queue. That
necessarily means that the behavior of the insert routine ought to be to first add the rows and associated insert query
into a write-pending queue, and then check if the queue has exceeded the batch size of 3. If so, flush all pending
writes to PG. Let’s add that next:
object BufferedWriter extends App {
val BATCH_SIZE = 5
...
def insert(sql: String, rows: Seq[Seq[Any]]): Unit = {
this.synchronized {
val totalRows = addToQueue(sql, rows)
if (totalRows >= BATCH_SIZE) {
writeRows(writePendingQueue)
}
}
}
private def addToQueue(sql: String, row: Seq[Seq[Any]]): Int = {
writePendingQueue.append((sql, row))
writePendingQueue.foldLeft(0) { (acc, elem) =>
acc + elem._2.size
}
}
private def writeRows(queue: ListBuffer[(String, Seq[Seq[Any]])]): Int = {
var conn: Connection = null
var totalRowsWritten = 0
try {
conn = getConnection
conn.setAutoCommit(false)
val groupedBySQL = queue.groupBy(_._1)
groupedBySQL.foreach { case (sql, entries) =>
val stmt = conn.prepareStatement(sql)
try {
val allRows = entries.flatMap(_._2)
allRows.foreach { row =>
for (i <- row.indices) {
stmt.setObject(i + 1, row(i))
}
stmt.addBatch()
}
val batchResults = stmt.executeBatch()
totalRowsWritten += batchResults.sum
} finally {
stmt.close()
}
}
conn.commit()
println(s"Transaction committed with $totalRowsWritten rows")
totalRowsWritten
} catch {
case ex: SQLException =>
println(s"SQL Exception: ${ex.getMessage}")
if (conn != null) conn.rollback()
throw ex
case ex: Exception =>
println(s"General Exception: ${ex.getMessage}")
if (conn != null) conn.rollback()
throw ex
} finally {
if (conn != null) {
conn.close()
}
}
}
....
}
We now we actually have a running program. Let’s run it and see the rows inserted:
postgres=# select * from users;
1 | Jai | 4000.00 | 2025-03-09
2 | Veeru | 5000.00 | 2025-03-09
3 | Samba | 6000.00 | 2025-03-09
4 | Ram | 1000.00 | 2025-03-09
5 | Shyam | 2000.00 | 2025-03-09
6 | Krishna | 3000.00 | 2025-03-09
At this point the insert
routine is not asychronous yet. We can consider wrapping the insert logic into a Future at this point, and return the
Future logic to the caller. We’ll have to -
- rewrite the
insert
routine to create the Future and return it to the caller, and - amend the
runUseCase
to handle the Future value and callback code. We’ll remove the currentinsert
instructions and add new ones to handle the Future
...
import java.util.concurrent.{ExecutorService, Executors}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success}
object BufferedWriter extends App {
...
private val es : ExecutorService = Executors.newCachedThreadPool()
implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(es)
...
def insert(sql: String, rows: Seq[Seq[Any]]): Future[Int] = {
val (shouldWrite, totalRows) = this.synchronized {
val totalRows = addToQueue(sql, row)
(totalRows >= BATCH_SIZE, totalRows)
}
if (shouldWrite) {
val queueCopy = this.synchronized {
val copy = ListBuffer[(String, Seq[Seq[Any]])]() ++= writePendingQueue
writePendingQueue.clear()
copy
}
Future {
writeRows(queueCopy)
}
} else {
Future.successful(0)
}
}
...
private def runUseCase(): Unit = {
...
val f1 = insert(insertSql, rowsT0)
f1.onComplete {
case Success(count) => println(s"First batch: Successfully inserted $count rows")
case Failure(ex) => println(s"First batch: Failed to insert rows - ${ex.getMessage}")
}
val f2 = insert(insertSql, rowsT1)
f2.onComplete {
case Success(count) => println(s"Second batch: Successfully inserted $count rows")
case Failure(ex) => println(s"Second batch: Failed to insert rows - ${ex.getMessage}")
}
Await.result(f1, scala.concurrent.duration.Duration.Inf)
Await.result(f2, scala.concurrent.duration.Duration.Inf)
println(s"Rows inserted in first operation: ${f1.value}")
println(s"Rows inserted in second operation: ${f2.value}")
}
runUseCase()
es.shutdown()
}
This is how this code will behave. The first insert will spawn a Future
which will start executing immediately
(technically as soon as a thread is available from the cached thread pool). Since the batch size is < 3, the Future
will complete without writing anything to Postgres. The second insert will spawn a Future
too, and which again will
start executing immediately. Since the batch size is now greater than 3, the Future will complete after writing all 6
rows to Postgres. So we have added async behavior to the insert
API, but immediately notice a few things -
- The following is printed on the console
First batch: Successfully inserted 0 rows
Transaction committed with 6 rows
Second batch: Successfully inserted 6 rows
Rows inserted in first operation: Some(Success(0))
Rows inserted in second operation: Some(Success(6))
indicating that the first future completed immediately, without writing the rows associated with the insert that returned it. The second future completed after that, writing all 6 rows, even those associated with the first insert operation! This is not the behavior we wanted. We want both the Futures to be completed exactly when the rows associated with the Future are actually inserted.
So how can we accomplish that? Enter Promises. A Promise
is an object that -
- Has a Future associated with it, which you can immediately derive and hand it back to the caller
- Can be completed at a future time. Once its completed, the Future is completed too, with the value of the Promise.
This is exactly what we need! We need a way to complete the two Futures only when the batch is full. When the batch is full, we will complete the writes, and then complete the Promise associated with each set of rows. How it can be done in code is like this -
...
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
object BufferedWriterPromise extends App {
...
private val writePendingQueue = new ListBuffer[(String, Seq[Seq[Any]], Promise[Int])]()
...
def insert(sql: String, rows: Seq[Seq[Any]]): Future[Int] = {
val promise = Promise[Int]()
val (shouldWrite, totalRows) = this.synchronized {
val totalRows = addToQueue(sql, rows, promise)
(totalRows >= BATCH_SIZE, totalRows)
}
if (shouldWrite) {
val queueCopy = this.synchronized {
val copy = ListBuffer[(String, Seq[Seq[Any]], Promise[Int])]() ++= writePendingQueue
writePendingQueue.clear()
copy
}
Future {
val rowsWritten = writeRows(queueCopy)
// Complete all promises with the number of rows they contributed
queueCopy.foreach { case (_, batchRows, p) =>
p.success(batchRows.size)
}
rowsWritten
}.onComplete {
case Failure(ex) =>
// Complete all promises with failure in case of error
queueCopy.foreach { case (_, _, p) => p.failure(ex) }
case _ =>
}
}
promise.future
}
private def addToQueue(sql: String, rows: Seq[Seq[Any]], promise: Promise[Int]): Int = {
writePendingQueue.append((sql, rows, promise))
writePendingQueue.foldLeft(0) { (acc, elem) =>
acc + elem._2.size
}
}
private def writeRows(queue: ListBuffer[(String, Seq[Seq[Any]], Promise[Int])]): Int = {
...
}
....
}
The difference now is -
writePendingQueue
now holds aPromise
object associated with eachinsert
call. Earlier the queue was holding (SQL Statement, Row Data) tuples, and now it will hold (SQL Statement, Row Data, Promise) tuples.insert
creates aPromise
object. It then adds the batch to thewritePendingQueue
. If the queue is full, it creates aFuture
to complete the actual write to Postgres. Once that write completes, all Promises are completed by either marking them a success, or a failure, completing the Future associated with each.- The signature of
writeRows
has changed, though it does actually need to operate on the Promise object addToQueue
has to be changed to addPromise
to the pending queue item tuples
The key takeaway is that we can now ensure Futures are properly completed only when their corresponding rows have been successfully written to Postgres 🎉🥳
Side Notes & Considerations Link to heading
While this post focuses on illustrating the use of Promise
(s), there are a few additional considerations worth noting:
Lack of type safety in
insert
API: Theinsert
API does not enforce type safety for row (field) values. Callers could insert a sequence like(Int, Int, DateTime)
, even if the database schema expects(DateTime, Int, Int)
. A more type-safe approach would prevent such mismatches, but I’ll leave that discussion for another postUnbounded queue considerations: The implementation uses an unbounded queue, which introduces potential risks:
- Memory overflow if too many items accumulate
- No failsafe mechanism if the queue never reaches the batch limit, leading to potential data loss or indefinite delays
Simplistic
writeRows
implementation: ThewriteRows
routine is intentionally kept basic and does not fully leverage Scala’s idiomatic patterns. Additionally, the PostgreSQL insert logic via JDBC APIs lacks type safety. While there are more refined approaches, they are outside the scope of this discussionMixing functional and non-functional styles: The code is guilty of mixing the two styles. Making the code pure functional will mean leveraging a library like ZIO etc. but that would complicate the code and would distract from our core illustration
These trade-offs were made to keep the focus on demonstrating Promises in action. However, they’re important considerations when applying this pattern in a production setting.