Kotlin client: add support for programmatic change event errors.

This commit is contained in:
Sebastian Jeltsch 2026-04-07 10:50:34 +02:00
parent d77df8e18b
commit f044969298
2 changed files with 75 additions and 16 deletions

View file

@ -33,29 +33,49 @@ data class JwtTokenClaims(
)
@Serializable
sealed class DbEvent {
public class Update(val obj: JsonObject) : DbEvent()
public class Insert(val obj: JsonObject) : DbEvent()
public class Delete(val obj: JsonObject) : DbEvent()
public class Error(val msg: String) : DbEvent()
sealed class ChangeEvent {
public class Update(val seq: Long?, val obj: JsonObject) : ChangeEvent()
public class Insert(val seq: Long?, val obj: JsonObject) : ChangeEvent()
public class Delete(val seq: Long?, val obj: JsonObject) : ChangeEvent()
public enum class ErrorStatus(val id: Long) {
UNKNOWN(0),
FORBIDDEN(1),
LOSS(2);
companion object {
private val byId: Map<Long, ErrorStatus> = entries.associateBy { it.id }
fun fromId(id: Long?): ErrorStatus = byId[id ?: 0] ?: UNKNOWN
}
}
public class Error(val seq: Long?, val status: ErrorStatus, val message: String?) : ChangeEvent()
companion object {
fun from(obj: JsonObject): DbEvent? {
fun parse(msg: String): ChangeEvent? {
val obj: JsonObject = jsonSerializer.decodeFromString(msg)
val seq: Long? = obj.get("seq")?.jsonPrimitive?.longOrNull
val update = obj.get("Update")
if (update != null) {
return DbEvent.Update(update.jsonObject)
return ChangeEvent.Update(seq, update.jsonObject)
}
val insert = obj.get("Insert")
if (insert != null) {
return DbEvent.Insert(insert.jsonObject)
return ChangeEvent.Insert(seq, insert.jsonObject)
}
val delete = obj.get("Delete")
if (delete != null) {
return DbEvent.Delete(delete.jsonObject)
return ChangeEvent.Delete(seq, delete.jsonObject)
}
val error = obj.get("Error")
if (error != null) {
return DbEvent.Error("${error}")
val errObj = error.jsonObject
return ChangeEvent.Error(
seq,
ErrorStatus.fromId(errObj.get("status")?.jsonPrimitive?.longOrNull),
errObj.get("message")?.jsonPrimitive?.contentOrNull,
)
}
return null
@ -235,7 +255,7 @@ class RecordApi(val name: String, val client: Client) {
client.fetch("${RECORD_API}/${name}/${id.id()}", Method.delete)
}
suspend inline fun <reified T> subscribe(id: RecordId): Flow<DbEvent> {
suspend inline fun <reified T> subscribe(id: RecordId): Flow<ChangeEvent> {
val path = "${RECORD_API}/${name}/subscribe/${id.id()}"
// NOTE: We should probably push this into a Client.sse.
@ -253,8 +273,7 @@ class RecordApi(val name: String, val client: Client) {
// event.data?.takeIf { predicate }(predicate)
val data = ev.data
if (data != null) {
val obj: JsonObject = jsonSerializer.decodeFromString(data)
val event = DbEvent.from(obj)
val event = ChangeEvent.parse(data)
if (event != null) {
emit(event)
}

View file

@ -130,6 +130,46 @@ class ClientTest {
assertEquals(2, params.size)
}
@Test
fun `parse change events`() {
val ev0 =
ChangeEvent.parse(
"""{
"Error": {
"status": 1,
"message": "test"
},
"seq": 3
}"""
)
val errEvent0 = ev0 as ChangeEvent.Error
assertEquals(3, errEvent0.seq)
assertEquals("test", errEvent0.message)
assertEquals(ChangeEvent.ErrorStatus.FORBIDDEN, errEvent0.status)
val ev1 = ChangeEvent.parse("""{ "Error": { "status": 1 } }""")
val errEvent1 = ev1 as ChangeEvent.Error
assertEquals(null, errEvent1.seq)
assertEquals(ChangeEvent.ErrorStatus.FORBIDDEN, errEvent1.status)
val ev2 =
ChangeEvent.parse(
"""{
"Update": {
"col0": "val0",
"col1": 4
},
"seq": 4
}"""
)
val updateEvent = ev2 as ChangeEvent.Update
assertEquals(4, updateEvent.seq)
assertNotNull(updateEvent.obj)
}
// WARN: TrailBase binding to localhost:4000 doesn't work. ktor only finds it when bound to
// 127.0.0.1 or 0.0.0.0, no IPv6?.
@Test
@ -301,17 +341,17 @@ class ClientTest {
val id = api.create(SimpleStrictInsert("kotlin subscription test 0: =?&${now}"))
api.delete(id)
val result = mutableListOf<DbEvent>()
val result = mutableListOf<ChangeEvent>()
flow.take(2).toList(result)
assertEquals(2, result.count())
val insert: SimpleStrict =
localJsonSerializer.decodeFromJsonElement((result[0] as DbEvent.Insert).obj)
localJsonSerializer.decodeFromJsonElement((result[0] as ChangeEvent.Insert).obj)
assertEquals(insert.id, id.id())
val delete: SimpleStrict =
localJsonSerializer.decodeFromJsonElement((result[1] as DbEvent.Delete).obj)
localJsonSerializer.decodeFromJsonElement((result[1] as ChangeEvent.Delete).obj)
assertEquals(delete.id, id.id())
}
}