Skip to content

Commit

Permalink
fix: down syncs not being reflected in client app (#66)
Browse files Browse the repository at this point in the history
* fix: client id mismatch leading to syncing issues

* fix: client id mismatch leading to syncing issues
  • Loading branch information
DominicGBauer authored Oct 8, 2024
1 parent 85da9c6 commit 55cf059
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 27 deletions.
30 changes: 18 additions & 12 deletions core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ internal class BucketStorage(

logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" }

return db.readTransaction {
return db.writeTransaction {
if (hasCrud()) {
logger.w { "[updateLocalTarget] ps crud is not empty" }
return@readTransaction false
return@writeTransaction false
}

val seqAfter =
Expand All @@ -88,15 +88,17 @@ internal class BucketStorage(
throw AssertionError("Sqlite Sequence should not be empty")

if (seqAfter != seqBefore) {
logger.d("seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore")
// New crud data may have been uploaded since we got the checkpoint. Abort.
return@readTransaction false
return@writeTransaction false
}

db.execute(
"UPDATE ${InternalTable.BUCKETS} SET target_op = ? WHERE name='\$local'",
"UPDATE ${InternalTable.BUCKETS} SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
listOf(opId)
)
return@readTransaction true

return@writeTransaction true
}
}

Expand All @@ -113,7 +115,7 @@ internal class BucketStorage(

suspend fun getBucketStates(): List<BucketState> {
return db.getAll(
"SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0",
"SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable.BUCKETS} WHERE pending_delete = 0",
mapper = { cursor ->
BucketState(
bucket = cursor.getString(0)!!,
Expand All @@ -138,6 +140,8 @@ internal class BucketStorage(
)
}

Logger.d("[deleteBucket] Done deleting")

this.pendingBucketDeletes.value = true
}

Expand Down Expand Up @@ -206,7 +210,7 @@ internal class BucketStorage(

private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult {
val res = db.getOptional(
"SELECT powersync_validate_checkpoint(?) as result",
"SELECT powersync_validate_checkpoint(?) AS result",
parameters = listOf(JsonUtil.json.encodeToString(checkpoint)),
mapper = { cursor ->
cursor.getString(0)!!
Expand All @@ -227,11 +231,16 @@ internal class BucketStorage(
*/
private suspend fun updateObjectsFromBuckets(): Boolean {
return db.writeTransaction { tx ->
val res = tx.execute(

tx.execute(
"INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
listOf("sync_local", "")
)

val res = tx.get("select last_insert_rowid()") { cursor ->
cursor.getLong(0)!!
}

return@writeTransaction res == 1L
}
}
Expand Down Expand Up @@ -260,12 +269,9 @@ internal class BucketStorage(

db.writeTransaction { tx ->
tx.execute(
"DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)",
"INSERT INTO powersync_operations(op, data) VALUES (?, ?)", listOf("delete_pending_buckets","")
)

tx.execute(
"DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op",
)
// Executed once after start-up, and again when there are pending deletes.
pendingBucketDeletes.value = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ internal class PowerSyncDatabaseImpl(
}

return@readTransaction CrudTransaction(
crud = entries, transactionId = txId,
crud = entries,
transactionId = txId,
complete = { writeCheckpoint ->
logger.i { "[CrudTransaction::complete] Completing transaction with checkpoint $writeCheckpoint" }
handleWriteCheckpoint(entries.last().clientId, writeCheckpoint)
Expand Down Expand Up @@ -260,12 +261,12 @@ internal class PowerSyncDatabaseImpl(

if (writeCheckpoint != null && bucketStorage.hasCrud()) {
tx.execute(
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
"UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'",
listOf(writeCheckpoint),
)
} else {
tx.execute(
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
"UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'",
listOf(bucketStorage.getMaxOpId()),
)
}
Expand Down
10 changes: 2 additions & 8 deletions core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,14 @@ internal class SyncStream(
return false
} else {
// This isolate is the only one triggering
bucketStorage.updateLocalTarget { getWriteCheckpoint() }
return true
return bucketStorage.updateLocalTarget { getWriteCheckpoint() }
}
}

private suspend fun getWriteCheckpoint(): String {
val credentials = connector.getCredentialsCached()
require(credentials != null) { "Not logged in" }
val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId'")
val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId")

val response = httpClient.get(uri) {
contentType(ContentType.Application.Json)
Expand Down Expand Up @@ -263,9 +262,7 @@ internal class SyncStream(
jsonString: String,
state: SyncStreamState
): SyncStreamState {
logger.i { "[handleInstruction] Received Instruction: $jsonString" }
val obj = JsonUtil.json.parseToJsonElement(jsonString).jsonObject

// TODO: Clean up
when {
isStreamingSyncCheckpoint(obj) -> return handleStreamingSyncCheckpoint(obj, state)
Expand Down Expand Up @@ -293,7 +290,6 @@ internal class SyncStream(
): SyncStreamState {
val checkpoint =
JsonUtil.json.decodeFromJsonElement<Checkpoint>(jsonObj["checkpoint"] as JsonElement)

state.targetCheckpoint = checkpoint
val bucketsToDelete = state.bucketSet!!.toMutableList()
val newBuckets = mutableSetOf<String>()
Expand Down Expand Up @@ -388,8 +384,6 @@ internal class SyncStream(
jsonObj: JsonObject,
state: SyncStreamState
): SyncStreamState {


val syncBuckets =
listOf<SyncDataBucket>(JsonUtil.json.decodeFromJsonElement(jsonObj["data"] as JsonElement))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.powersync.demos.powersync

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.powersync.PowerSyncDatabase
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

internal class ListContent(
Expand Down Expand Up @@ -37,7 +39,7 @@ internal class ListContent(
}

fun onItemDeleteClicked(item: ListItem) {
runBlocking {
viewModelScope.launch {
db.writeTransaction { tx ->
tx.execute("DELETE FROM $LISTS_TABLE WHERE id = ?", listOf(item.id))
tx.execute("DELETE FROM $TODOS_TABLE WHERE list_id = ?", listOf(item.id))
Expand All @@ -48,7 +50,7 @@ internal class ListContent(
fun onAddItemClicked() {
if (_inputText.value.isBlank()) return

runBlocking {
viewModelScope.launch {
db.writeTransaction { tx ->
tx.execute(
"INSERT INTO $LISTS_TABLE (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.powersync.demos.powersync

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import com.powersync.PowerSyncDatabase
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

internal class ListContent(
Expand Down Expand Up @@ -37,7 +39,7 @@ internal class ListContent(
}

fun onItemDeleteClicked(item: ListItem) {
runBlocking {
viewModelScope.launch {
db.writeTransaction { tx ->
tx.execute("DELETE FROM $LISTS_TABLE WHERE id = ?", listOf(item.id))
tx.execute("DELETE FROM $TODOS_TABLE WHERE list_id = ?", listOf(item.id))
Expand All @@ -48,7 +50,7 @@ internal class ListContent(
fun onAddItemClicked() {
if (_inputText.value.isBlank()) return

runBlocking {
viewModelScope.launch {
db.writeTransaction { tx ->
tx.execute(
"INSERT INTO $LISTS_TABLE (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",
Expand Down

0 comments on commit 55cf059

Please sign in to comment.