diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 33e55e18d3c..cc61634f9a6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -103,7 +103,6 @@ import kotlin.Unit; import kotlinx.coroutines.Job; import network.loki.messenger.BuildConfig; -import nl.komponents.kovenant.Kovenant; import static nl.komponents.kovenant.android.KovenantAndroid.startKovenant; import static nl.komponents.kovenant.android.KovenantAndroid.stopKovenant; diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 0f449c9f3aa..0ab48243438 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -185,15 +185,15 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, DatabaseFactory.getSessionJobDatabase(context).persistJob(job) } - override fun markJobAsSucceeded(job: Job) { - DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(job) + override fun markJobAsSucceeded(jobId: String) { + DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId) } - override fun markJobAsFailed(job: Job) { - DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(job) + override fun markJobAsFailed(jobId: String) { + DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId) } - override fun getAllPendingJobs(type: String): List { + override fun getAllPendingJobs(type: String): Map { return DatabaseFactory.getSessionJobDatabase(context).getAllPendingJobs(type) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index dda3b7d7eb0..04edd3715e7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -4,6 +4,7 @@ import android.content.ContentValues import android.content.Context import net.sqlcipher.Cursor import org.session.libsession.messaging.jobs.* +import org.session.libsignal.utilities.logging.Log import org.thoughtcrime.securesms.database.Database import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer @@ -30,19 +31,25 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) } - fun markJobAsSucceeded(job: Job) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) + fun markJobAsSucceeded(jobId: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId)) } - fun markJobAsFailed(job: Job) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) + fun markJobAsFailed(jobId: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId)) } - fun getAllPendingJobs(type: String): List { + fun getAllPendingJobs(type: String): Map { val database = databaseHelper.readableDatabase return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor -> - jobFromCursor(cursor) - } + val jobId = cursor.getString(jobID) + try { + jobId to jobFromCursor(cursor) + } catch (e: Exception) { + Log.e("Loki", "Error serializing Job of type $type",e) + jobId to null + } + }.toMap() } fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? { diff --git a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt index f64aa1a032c..da604264d16 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt @@ -44,9 +44,9 @@ interface StorageProtocol { // Jobs fun persistJob(job: Job) - fun markJobAsSucceeded(job: Job) - fun markJobAsFailed(job: Job) - fun getAllPendingJobs(type: String): List + fun markJobAsSucceeded(jobId: String) + fun markJobAsFailed(jobId: String) + fun getAllPendingJobs(type: String): Map fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun resumeMessageSendJobIfNeeded(messageSendJobID: String) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index f1351786182..1d8b1a71702 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -108,7 +108,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess val messageSendJob = storage.getMessageSendJob(messageSendJobID) MessageSender.handleFailedMessageSend(this.message, e) if (messageSendJob != null) { - storage.markJobAsFailed(messageSendJob) + storage.markJobAsFailed(messageSendJobID) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 9b1f65f4a7e..cffb2db7d60 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -78,10 +78,24 @@ class JobQueue : JobDelegate { return } hasResumedPendingJobs = true - val allJobTypes = listOf(AttachmentDownloadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY, NotifyPNServerJob.KEY) + val allJobTypes = listOf(AttachmentUploadJob.KEY, + AttachmentDownloadJob.KEY, + MessageReceiveJob.KEY, + MessageSendJob.KEY, + NotifyPNServerJob.KEY + ) allJobTypes.forEach { type -> val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type) - allPendingJobs.sortedBy { it.id }.forEach { job -> + val pendingJobs = mutableListOf() + for ((id, job) in allPendingJobs) { + if (job == null) { + // job failed to serialize, remove it from the DB + handleJobFailedPermanently(id) + } else { + pendingJobs.add(job) + } + } + pendingJobs.sortedBy { it.id }.forEach { job -> Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.") queue.offer(job) // Offer always called on unlimited capacity } @@ -89,17 +103,18 @@ class JobQueue : JobDelegate { } override fun handleJobSucceeded(job: Job) { - MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(job) + val jobId = job.id ?: return + MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId) } override fun handleJobFailed(job: Job, error: Exception) { job.failureCount += 1 val storage = MessagingModuleConfiguration.shared.storage if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")} - storage.persistJob(job) if (job.failureCount == job.maxFailureCount) { - storage.markJobAsFailed(job) + handleJobFailedPermanently(job, error) } else { + storage.persistJob(job) val retryInterval = getRetryInterval(job) Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") timer.schedule(delay = retryInterval) { @@ -110,10 +125,13 @@ class JobQueue : JobDelegate { } override fun handleJobFailedPermanently(job: Job, error: Exception) { - job.failureCount += 1 + val jobId = job.id ?: return + handleJobFailedPermanently(jobId) + } + + private fun handleJobFailedPermanently(jobId: String) { val storage = MessagingModuleConfiguration.shared.storage - storage.persistJob(job) - storage.markJobAsFailed(job) + storage.markJobAsFailed(jobId) } private fun getRetryInterval(job: Job): Long { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index c695df83119..83822c4fc76 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -79,7 +79,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { override fun serialize(): Data { val kryo = Kryo() kryo.isRegistrationRequired = false - val output = Output(ByteArray(4096), -1) // maxBufferSize '-1' will dynamically grow internally if we run out of room serializing the message + val output = Output(ByteArray(4096), 10_000_000) kryo.writeClassAndObject(output, message) output.close() val serializedMessage = output.toBytes()