diff --git a/client/kotlin/lib/src/commonMain/kotlin/Library.kt b/client/kotlin/lib/src/commonMain/kotlin/Library.kt index 68782345..edbfa3cc 100644 --- a/client/kotlin/lib/src/commonMain/kotlin/Library.kt +++ b/client/kotlin/lib/src/commonMain/kotlin/Library.kt @@ -33,29 +33,49 @@ data class JwtTokenClaims( ) @Serializable -sealed class DbEvent { - public class Update(val obj: JsonObject) : DbEvent() - public class Insert(val obj: JsonObject) : DbEvent() - public class Delete(val obj: JsonObject) : DbEvent() - public class Error(val msg: String) : DbEvent() +sealed class ChangeEvent { + public class Update(val seq: Long?, val obj: JsonObject) : ChangeEvent() + public class Insert(val seq: Long?, val obj: JsonObject) : ChangeEvent() + public class Delete(val seq: Long?, val obj: JsonObject) : ChangeEvent() + + public enum class ErrorStatus(val id: Long) { + UNKNOWN(0), + FORBIDDEN(1), + LOSS(2); + + companion object { + private val byId: Map = entries.associateBy { it.id } + + fun fromId(id: Long?): ErrorStatus = byId[id ?: 0] ?: UNKNOWN + } + } + public class Error(val seq: Long?, val status: ErrorStatus, val message: String?) : ChangeEvent() companion object { - fun from(obj: JsonObject): DbEvent? { + fun parse(msg: String): ChangeEvent? { + val obj: JsonObject = jsonSerializer.decodeFromString(msg) + val seq: Long? = obj.get("seq")?.jsonPrimitive?.longOrNull + val update = obj.get("Update") if (update != null) { - return DbEvent.Update(update.jsonObject) + return ChangeEvent.Update(seq, update.jsonObject) } val insert = obj.get("Insert") if (insert != null) { - return DbEvent.Insert(insert.jsonObject) + return ChangeEvent.Insert(seq, insert.jsonObject) } val delete = obj.get("Delete") if (delete != null) { - return DbEvent.Delete(delete.jsonObject) + return ChangeEvent.Delete(seq, delete.jsonObject) } val error = obj.get("Error") if (error != null) { - return DbEvent.Error("${error}") + val errObj = error.jsonObject + return ChangeEvent.Error( + seq, + ErrorStatus.fromId(errObj.get("status")?.jsonPrimitive?.longOrNull), + errObj.get("message")?.jsonPrimitive?.contentOrNull, + ) } return null @@ -235,7 +255,7 @@ class RecordApi(val name: String, val client: Client) { client.fetch("${RECORD_API}/${name}/${id.id()}", Method.delete) } - suspend inline fun subscribe(id: RecordId): Flow { + suspend inline fun subscribe(id: RecordId): Flow { val path = "${RECORD_API}/${name}/subscribe/${id.id()}" // NOTE: We should probably push this into a Client.sse. @@ -253,8 +273,7 @@ class RecordApi(val name: String, val client: Client) { // event.data?.takeIf { predicate }(predicate) val data = ev.data if (data != null) { - val obj: JsonObject = jsonSerializer.decodeFromString(data) - val event = DbEvent.from(obj) + val event = ChangeEvent.parse(data) if (event != null) { emit(event) } diff --git a/client/kotlin/lib/src/jvmTest/kotlin/LibraryTest.kt b/client/kotlin/lib/src/jvmTest/kotlin/LibraryTest.kt index 06be6081..810019c6 100644 --- a/client/kotlin/lib/src/jvmTest/kotlin/LibraryTest.kt +++ b/client/kotlin/lib/src/jvmTest/kotlin/LibraryTest.kt @@ -130,6 +130,46 @@ class ClientTest { assertEquals(2, params.size) } + @Test + fun `parse change events`() { + val ev0 = + ChangeEvent.parse( + """{ + "Error": { + "status": 1, + "message": "test" + }, + "seq": 3 + }""" + ) + + val errEvent0 = ev0 as ChangeEvent.Error + assertEquals(3, errEvent0.seq) + assertEquals("test", errEvent0.message) + assertEquals(ChangeEvent.ErrorStatus.FORBIDDEN, errEvent0.status) + + val ev1 = ChangeEvent.parse("""{ "Error": { "status": 1 } }""") + + val errEvent1 = ev1 as ChangeEvent.Error + assertEquals(null, errEvent1.seq) + assertEquals(ChangeEvent.ErrorStatus.FORBIDDEN, errEvent1.status) + + val ev2 = + ChangeEvent.parse( + """{ + "Update": { + "col0": "val0", + "col1": 4 + }, + "seq": 4 + }""" + ) + + val updateEvent = ev2 as ChangeEvent.Update + assertEquals(4, updateEvent.seq) + assertNotNull(updateEvent.obj) + } + // WARN: TrailBase binding to localhost:4000 doesn't work. ktor only finds it when bound to // 127.0.0.1 or 0.0.0.0, no IPv6?. @Test @@ -301,17 +341,17 @@ class ClientTest { val id = api.create(SimpleStrictInsert("kotlin subscription test 0: =?&${now}")) api.delete(id) - val result = mutableListOf() + val result = mutableListOf() flow.take(2).toList(result) assertEquals(2, result.count()) val insert: SimpleStrict = - localJsonSerializer.decodeFromJsonElement((result[0] as DbEvent.Insert).obj) + localJsonSerializer.decodeFromJsonElement((result[0] as ChangeEvent.Insert).obj) assertEquals(insert.id, id.id()) val delete: SimpleStrict = - localJsonSerializer.decodeFromJsonElement((result[1] as DbEvent.Delete).obj) + localJsonSerializer.decodeFromJsonElement((result[1] as ChangeEvent.Delete).obj) assertEquals(delete.id, id.id()) } }