Skip to content

Commit

Permalink
Fixed and checked null-handling corner cases
Browse files Browse the repository at this point in the history
  • Loading branch information
matteosz committed Jan 7, 2024
1 parent 34ce01a commit fc05bc7
Show file tree
Hide file tree
Showing 29 changed files with 103 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ constructor(appDatabase: AppDatabase, application: Application) {
// An empty subject might have been created already
if (transactionsSubject.containsKey(current)) {
// Updating subject
transactionsSubject[current]?.toSerialized()?.onNext(ArrayList(transactionList))
transactionsSubject.getValue(current).toSerialized().onNext(ArrayList(transactionList))
} else {
// Creating new subject
transactionsSubject[current] = BehaviorSubject.createDefault(ArrayList(transactionList))
Expand Down Expand Up @@ -272,12 +272,12 @@ constructor(appDatabase: AppDatabase, application: Application) {
.collect(Collectors.toList())
}

fun getUserBalance(user: PublicKey?): Long {
val transactionList = transactions[user]
fun getUserBalance(user: PublicKey): Long {
val transactionList = transactions[user] ?: return 0
return transactionList
?.stream()
?.mapToLong { transaction: TransactionObject -> transaction.getSumForUser(user) }
?.sum() ?: 0
.stream()
.mapToLong { transaction: TransactionObject -> transaction.getSumForUser(user) }
.sum()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,15 @@ class ElectionRepository @Inject constructor(appDatabase: AppDatabase, applicati
val id = election.id
electionById[id] = election
electionSubjects.putIfAbsent(id, BehaviorSubject.create())
electionSubjects[id]?.toSerialized()?.onNext(election)
electionSubjects.getValue(id).toSerialized().onNext(election)
electionsSubject
.toSerialized()
.onNext(Collections.unmodifiableSet(HashSet(electionById.values)))
}

@Throws(UnknownElectionException::class)
fun getElection(electionId: String): Election {
val election = electionById[electionId]
return election ?: throw UnknownElectionException(electionId)
return electionById[electionId] ?: throw UnknownElectionException(electionId)
}

fun getElectionsSubject(): Observable<Set<Election>> {
Expand All @@ -161,8 +160,7 @@ class ElectionRepository @Inject constructor(appDatabase: AppDatabase, applicati

@Throws(UnknownElectionException::class)
fun getElectionSubject(electionId: String): Observable<Election> {
val electionObservable: Observable<Election>? = electionSubjects[electionId]
return electionObservable ?: throw UnknownElectionException(electionId)
return electionSubjects[electionId] ?: throw UnknownElectionException(electionId)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ class LAORepository @Inject constructor(appDatabase: AppDatabase, application: A
* @param channel the channel on which the Lao was created
* @return the Lao corresponding to this channel
*/
fun getLaoByChannel(channel: Channel): Lao? {
@Throws(UnknownLaoException::class)
fun getLaoByChannel(channel: Channel): Lao {
Timber.tag(TAG).d("querying lao for channel %s", channel)
return laoById[channel.extractLaoId()]
return laoById[channel.extractLaoId()] ?: throw UnknownLaoException(channel.extractLaoId())
}

/**
Expand All @@ -94,9 +95,9 @@ class LAORepository @Inject constructor(appDatabase: AppDatabase, application: A
val allLaoIds: Observable<List<String>>
get() = laosSubject

fun getLaoObservable(laoId: String): Observable<LaoView>? {
fun getLaoObservable(laoId: String): Observable<LaoView> {
subjectById.computeIfAbsent(laoId) { BehaviorSubject.create() }
return subjectById[laoId]
return subjectById.getValue(laoId)
}

@Throws(UnknownLaoException::class)
Expand All @@ -111,7 +112,7 @@ class LAORepository @Inject constructor(appDatabase: AppDatabase, application: A
if (laoFromDb == null) {
throw UnknownLaoException(id)
} else {
// Restore the lao
// Restore the lao in memory
updateLao(laoFromDb)
}
}
Expand Down Expand Up @@ -196,9 +197,9 @@ class LAORepository @Inject constructor(appDatabase: AppDatabase, application: A
* @param channel the lao channel
*/
fun updateNodes(channel: Channel) {
val nodes = getLaoByChannel(channel)!!.nodes
val nodes = getLaoByChannel(channel).nodes
channelToNodesSubject.putIfAbsent(channel, BehaviorSubject.create())
channelToNodesSubject[channel]?.onNext(nodes)
channelToNodesSubject.getValue(channel).onNext(nodes)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class MeetingRepository @Inject constructor(appDatabase: AppDatabase, applicatio
if (meetingSubjects.containsKey(id)) {
// If it exist we update the subject
Timber.tag(TAG).d("Updating existing meeting %s", meeting.name)
meetingSubjects[id]?.toSerialized()?.onNext(meeting)
meetingSubjects.getValue(id).toSerialized().onNext(meeting)
} else {
// If it does not, we create a new subject
Timber.tag(TAG).d("New meeting, subject created for %s", meeting.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class RollCallRepository @Inject constructor(appDatabase: AppDatabase, applicati
if (rollCallSubjects.containsKey(persistentId)) {
// If it exist we update the subject
Timber.tag(TAG).d("Updating existing roll call %s", rollCall.name)
rollCallSubjects[persistentId]?.toSerialized()?.onNext(rollCall)
rollCallSubjects.getValue(persistentId).toSerialized().onNext(rollCall)
} else {
// If it does not, we create a new subject
Timber.tag(TAG).d("New roll call, subject created for %s", rollCall.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,17 @@ import javax.inject.Singleton
*/
@Singleton
class ServerRepository @Inject constructor() {
private val serverByLaoId: MutableMap<String, Server>

init {
serverByLaoId = HashMap()
}
private val serverByLaoId: MutableMap<String, Server> = HashMap()

/** Add a server to the repository */
fun addServer(laoId: String, server: Server) {
serverByLaoId[laoId] = server
}

/** Get the corresponding server to the given Lao Id (if present) */
fun getServerByLaoId(laoId: String): Server? {
if (serverByLaoId.containsKey(laoId)) {
return serverByLaoId[laoId]
}
throw IllegalArgumentException("There is no backend associated with the LAO '$laoId'")
fun getServerByLaoId(laoId: String): Server {
return serverByLaoId[laoId]
?: throw IllegalArgumentException("There is no backend associated with the LAO '$laoId'")
}

val allServer: Collection<Server>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ constructor(appDatabase: AppDatabase, application: Application) {
* @return an observable set of message ids whose correspond to the set of chirp published on the
* given lao
*/
fun getChirpsOfLao(laoId: String): Observable<Set<MessageID?>> {
fun getChirpsOfLao(laoId: String): Observable<Set<MessageID>> {
return getLaoChirps(laoId).getChirpsSubject()
}

Expand Down Expand Up @@ -169,9 +169,8 @@ constructor(appDatabase: AppDatabase, application: Application) {
) {
// Chirps
private val chirps = ConcurrentHashMap<MessageID, Chirp>()
private val chirpSubjects = ConcurrentHashMap<MessageID?, Subject<Chirp>>()
private val chirpsSubject: Subject<Set<MessageID?>> =
BehaviorSubject.createDefault(emptySet<MessageID>())
private val chirpSubjects = ConcurrentHashMap<MessageID, Subject<Chirp>>()
private val chirpsSubject: Subject<Set<MessageID>> = BehaviorSubject.createDefault(emptySet())

// Reactions
val reactionByChirpId = ConcurrentHashMap<MessageID, MutableSet<Reaction>>()
Expand Down Expand Up @@ -281,7 +280,7 @@ constructor(appDatabase: AppDatabase, application: Application) {
return true
}

fun getChirpsSubject(): Observable<Set<MessageID?>> {
fun getChirpsSubject(): Observable<Set<MessageID>> {
return chirpsSubject
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ constructor(
})
}
}

// Reinsert the witness message with the new witness to update the subject
add(witnessMessage)
return true
Expand Down Expand Up @@ -426,7 +425,7 @@ constructor(
val idsToDelete =
witnessMessages.values
.stream()
.filter { witnessMessage: WitnessMessage? -> hasRequiredSignatures(witnessMessage) }
.filter { witnessMessage: WitnessMessage -> hasRequiredSignatures(witnessMessage) }
.map { obj: WitnessMessage -> obj.messageId }
.collect(Collectors.toSet())

Expand All @@ -444,7 +443,7 @@ constructor(
})

// Delete from memory
idsToDelete.forEach(Consumer { key: MessageID? -> witnessMessages.remove(key) })
idsToDelete.forEach(Consumer { key: MessageID -> witnessMessages.remove(key) })

// Publish the updated collection
witnessMessagesSubject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import timber.log.Timber
/** Represents a single websocket connection that can be closed */
open class Connection {
// Create a new subject whose purpose is to dispatch incoming messages to all subscribers
private val messagesSubject: BehaviorSubject<GenericMessage?>
private val messagesSubject: BehaviorSubject<GenericMessage>
private val manualState: BehaviorSubject<Lifecycle.State>
private val laoService: LAOService
private val disposables: CompositeDisposable
Expand All @@ -32,20 +32,20 @@ open class Connection {
disposables.add(
laoService
.observeMessage()
.doOnNext { msg: GenericMessage? ->
.doOnNext { msg: GenericMessage ->
Timber.tag(TAG).d("Received a new message from remote: %s", msg)
}
.subscribe({ t: GenericMessage? -> messagesSubject.onNext(t!!) }) { t: Throwable? ->
messagesSubject.onError(t!!)
.subscribe({ t: GenericMessage -> messagesSubject.onNext(t) }) { t: Throwable ->
messagesSubject.onError(t)
})

// Add logs on connection state events
disposables.add(
laoService
.observeWebsocket()
.subscribe(
{ event: WebSocket.Event? -> logEvent(event, url) },
{ err: Throwable? -> Timber.tag(TAG).d(err, "Error in connection %s", url) }))
{ event: WebSocket.Event -> logEvent(event, url) },
{ err: Throwable -> Timber.tag(TAG).d(err, "Error in connection %s", url) }))
}

protected constructor(connection: Connection) {
Expand All @@ -55,7 +55,7 @@ open class Connection {
messagesSubject = connection.messagesSubject
}

private fun logEvent(event: WebSocket.Event?, url: String) {
private fun logEvent(event: WebSocket.Event, url: String) {
val baseMsg = "Connection to $url"
when (event) {
is OnConnectionOpened<*> -> {
Expand Down Expand Up @@ -83,11 +83,11 @@ open class Connection {
laoService.sendMessage(msg)
}

open fun observeMessage(): Observable<GenericMessage?> {
open fun observeMessage(): Observable<GenericMessage> {
return messagesSubject
}

open fun observeConnectionEvents(): Observable<WebSocket.Event?> {
open fun observeConnectionEvents(): Observable<WebSocket.Event> {
return laoService.observeWebsocket()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ constructor(
.combineWith(
FlowableLifecycleInt(
manualState.toFlowable(BackpressureStrategy.LATEST),
schedulerProvider.computation()))) // .backoffStrategy(new
// ExponentialBackoffStrategy())
schedulerProvider.computation())))
// .backoffStrategy(new ExponentialBackoffStrategy())
.build()

// And return a bundled object of the service and the subject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ constructor(
private val schedulerProvider: SchedulerProvider
) : Disposable {
private var networkManager: MessageSender? = null
var currentUrl: String? = null
private set
var currentUrl: String = BuildConfig.DEFAULT_URL

init {
connect(BuildConfig.DEFAULT_URL)
connect(currentUrl)
}

@JvmOverloads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class LAONetworkManager(
private val requestCounter = AtomicInteger()

// A subject that represents unprocessed messages
private val unprocessed: Subject<GenericMessage?> = PublishSubject.create()
private val unprocessed: Subject<GenericMessage> = PublishSubject.create()
private val reprocessingCounter = ConcurrentHashMap<GenericMessage, Int>()
private val subscribedChannels: MutableSet<Channel>
private val disposables = CompositeDisposable()
Expand All @@ -70,7 +70,7 @@ class LAONetworkManager(
.observeConnectionEvents() // Observe the events of a connection
.subscribeOn(
schedulerProvider.io()) // Filter out events that are not related to a reconnection
.filter { event: WebSocket.Event? -> event is WebSocket.Event.OnConnectionOpened<*> }
.filter { event: WebSocket.Event -> event is WebSocket.Event.OnConnectionOpened<*> }
// Subscribe to the stream and when a connection event is received, send a subscribe
// message
// for each channel we are supposed to be subscribed to.
Expand Down Expand Up @@ -98,8 +98,8 @@ class LAONetworkManager(
// with a delay of 5 seconds to give priority to new messages.
unprocessed.delay(
REPROCESSING_DELAY.toLong(), TimeUnit.SECONDS, schedulerProvider.computation()))
.filter { obj: GenericMessage? -> obj is Broadcast } // Filter the Broadcast
.map { obj: GenericMessage? -> obj as Broadcast }
.filter { obj: GenericMessage -> obj is Broadcast } // Filter the Broadcast
.map { obj: GenericMessage -> obj as Broadcast }
.subscribeOn(schedulerProvider.newThread())
.subscribe({ broadcast: Broadcast -> handleBroadcast(broadcast) }) { error: Throwable ->
Timber.tag(TAG).d(error, "Error on processing message")
Expand Down Expand Up @@ -157,14 +157,14 @@ class LAONetworkManager(
Timber.tag(TAG).d("Removing %s from subscriptions", channel)
subscribedChannels.remove(channel)
}
.doOnError { error: Throwable? -> Timber.tag(TAG).d(error, "error unsubscribing") }
.doOnError { error: Throwable -> Timber.tag(TAG).d(error, "error unsubscribing") }
.ignoreElement()
}

override val connectEvents: Observable<WebSocket.Event?>
override val connectEvents: Observable<WebSocket.Event>
get() = multiConnection.observeConnectionEvents()

override val subscriptions: Set<Channel?>
override val subscriptions: Set<Channel>
get() = HashSet(subscribedChannels)

override fun extendConnection(peerAddressList: List<PeerAddress>) {
Expand Down Expand Up @@ -203,15 +203,13 @@ class LAONetworkManager(
}
}

private fun handleMessages(messages: List<MessageGeneral>, channel: Channel?) {
private fun handleMessages(messages: List<MessageGeneral>, channel: Channel) {
fun handleError(e: Exception) {
Timber.tag(TAG).e(e, "Error while handling received catchup message")
}
for (msg in messages) {
try {
messageHandler.handleMessage(this, channel!!, msg)
} catch (e: Exception) {
Timber.tag(TAG).e(e, "Error while handling received catchup message")
messageHandler.handleMessage(this, channel, msg)
} catch (e: DataHandlingException) {
handleError(e)
} catch (e: UnknownLaoException) {
Expand All @@ -235,8 +233,8 @@ class LAONetworkManager(
// certain the reply will be processed and the message is only sent when an observer
// subscribes to the request answer.
.doOnSubscribe { multiConnection.sendMessage(query) }
.filter { obj: GenericMessage? -> obj is Answer } // Filter for Answers
.map { obj: GenericMessage? ->
.filter { obj: GenericMessage -> obj is Answer } // Filter for Answers
.map { obj: GenericMessage ->
obj as Answer
} // This specific request has an id, only let the related Answer pass
.filter { answer: Answer -> answer.id == query.requestId }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.reactivex.Observable
interface LAOService {
@Send fun sendMessage(msg: Message)

@Receive fun observeMessage(): Observable<GenericMessage?>
@Receive fun observeMessage(): Observable<GenericMessage>

@Receive fun observeWebsocket(): Observable<WebSocket.Event?>
@Receive fun observeWebsocket(): Observable<WebSocket.Event>
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ interface MessageSender : Disposable {
fun unsubscribe(channel: Channel): Completable

/** @return an Observable of WebSocket events of the underlying connection */
val connectEvents: Observable<WebSocket.Event?>
val connectEvents: Observable<WebSocket.Event>

/** @return the list of channels we subscribed to */
val subscriptions: Set<Channel?>
val subscriptions: Set<Channel>

/**
* Extend the connection by connecting to the peers of a server upon a GreetLao
Expand Down
Loading

0 comments on commit fc05bc7

Please sign in to comment.