Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

show attempts for chat message #732

Merged
merged 8 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions freedata_gui/src/components/chat_conversations.vue
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import pinia from "../store/index";
import { useChatStore } from "../store/chatStore.js";
import {
getBeaconDataByCallsign,
setFreedataMessageAsUnread,
setFreedataMessageAsRead,
getFreedataMessages,
} from "../js/api.js";
import { ref } from "vue";
Expand All @@ -25,7 +25,7 @@ async function setMessagesAsRead(callsign) {
if (typeof messages !== "undefined") {
messages.forEach((message) => {
if (typeof message.is_read !== "undefined" && !message.is_read) {
setFreedataMessageAsUnread(message.id);
setFreedataMessageAsRead(message.id);
message.is_read = true;
}
});
Expand Down
8 changes: 7 additions & 1 deletion freedata_gui/src/components/chat_messages_sent.vue
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@

<div class="card-footer p-0 bg-secondary border-top-0">
<p class="text p-0 m-0 me-1 text-end">
{{ message.status }} | {{ getDateTime }}
<span class="badge badge-primary mr-2" v-bind:class="{
'bg-danger': message.status == 'failed',
'bg-primary': message.status == 'transmitting',
'bg-secondary': message.status == 'transmitted',
}"
>{{ message.status }}</span>
| attempt: {{ message.attempt + 1 }} | {{ getDateTime }}
</p>
<!-- Display formatted timestamp in card-footer -->
</div>
Expand Down
6 changes: 3 additions & 3 deletions freedata_gui/src/js/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,11 @@ export async function sendFreedataMessage(destination, body, attachments) {
}

export async function retransmitFreedataMessage(id) {
return await apiPost(`/freedata/messages/${id}`);
return await apiPatch(`/freedata/messages/${id}/retransmit`);
}

export async function setFreedataMessageAsUnread(id) {
return await apiPatch(`/freedata/messages/${id}`);
export async function setFreedataMessageAsRead(id) {
return await apiPatch(`/freedata/messages/${id}`, { is_read: true });
}

export async function deleteFreedataMessage(id) {
Expand Down
4 changes: 2 additions & 2 deletions freedata_server/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,5 @@ def modem_failed(self):
event = {"freedata_server": "failed"}
self.broadcast(event)

def freedata_message_db_change(self):
self.broadcast({"message-db": "changed"})
def freedata_message_db_change(self, message_id=None):
self.broadcast({"message-db": "changed", "message_id": message_id})
47 changes: 45 additions & 2 deletions freedata_server/message_system_db_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# database_manager.py
import sqlite3

from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
from sqlalchemy.orm import scoped_session, sessionmaker
from threading import local
from message_system_db_model import Base, Station, Status
from message_system_db_model import Base, Station, Status, P2PMessage
import structlog
import helpers
import os
Expand Down Expand Up @@ -54,6 +54,49 @@ def initialize_default_values(self):
finally:
session.remove()

def database_repair_and_cleanup(self):
session = self.get_thread_scoped_session()
try:

# Fetch the 'failed' status ID
failed_status = session.query(Status).filter_by(name="failed").first()
if not failed_status:
raise ValueError("Failed status not found in the database")

# Fetch the 'transmitting' status ID
transmitting_status = session.query(Status).filter_by(name="transmitting").first()
if transmitting_status:
# Check if any messages have the status "transmitting" and update them to "failed"
messages_to_update = session.query(P2PMessage).filter_by(status_id=transmitting_status.id).all()
for message in messages_to_update:
message.status_id = failed_status.id

session.commit()
len_repaired_message = len(messages_to_update)
if len_repaired_message > 0:
self.log(f"Repaired {len_repaired_message} messages ('transmitting' to 'failed')")

# Vacuum the database to reclaim space
session.execute(text("VACUUM"))
self.log("Database vacuumed successfully")

# Reindex all tables to improve query performance
session.execute(text("REINDEX"))
self.log("Database reindexed successfully")

# Perform an integrity check on the database
result = session.execute(text("PRAGMA integrity_check")).fetchone()
if result[0] == 'ok':
self.log("Database integrity check passed")
else:
self.log("Database integrity check failed", isWarning=True)

except Exception as e:
session.rollback()
self.log(f"An error occurred while checking databse: {e}", isWarning=True)
finally:
session.remove()

def log(self, message, isWarning=False):
msg = f"[{type(self).__name__}]: {message}"
logger = self.logger.warn if isWarning else self.logger.info
Expand Down
57 changes: 30 additions & 27 deletions freedata_server/message_system_db_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,44 @@ def add_message(self, message_data, statistics, direction='receive', status=None

session.commit()
self.log(f"Added data to database: {new_message.id}")
self.event_manager.freedata_message_db_change()
self.event_manager.freedata_message_db_change(message_id=new_message.id)
return new_message.id
except IntegrityError as e:
session.rollback() # Roll back the session to a clean state
self.log(f"Message with ID {message_data['id']} already exists in the database.", isWarning=True)
return None # or you might return the existing message's ID or details

return None

except Exception as e:
session.rollback()
self.log(f"error adding new message to database with error: {e}", isWarning=True)
finally:
session.remove()


def get_all_messages(self):
def get_all_messages(self, filters=None):
session = self.get_thread_scoped_session()
try:
messages = session.query(P2PMessage).all()
query = session.query(P2PMessage)

if filters:
if 'id' in filters:
query = query.filter(P2PMessage.id == filters['id'])
if 'callsign' in filters:
callsign_filter = filters['callsign']
query = query.filter(
(P2PMessage.origin_callsign.contains(callsign_filter)) |
(P2PMessage.via_callsign.contains(callsign_filter)) |
(P2PMessage.destination_callsign.contains(callsign_filter))
)
if 'origin_callsign' in filters:
query = query.filter(P2PMessage.origin_callsign.contains(filters['origin_callsign']))
if 'via_callsign' in filters:
query = query.filter(P2PMessage.via_callsign.contains(filters['via_callsign']))
if 'destination_callsign' in filters:
query = query.filter(P2PMessage.destination_callsign.contains(filters['destination_callsign']))
if 'direction' in filters:
query = query.filter(P2PMessage.direction.contains(filters['direction']))

messages = query.all()
return [message.to_dict() for message in messages]

except Exception as e:
Expand All @@ -83,9 +102,9 @@ def get_all_messages(self):
finally:
session.remove()

def get_all_messages_json(self):
messages_dict = self.get_all_messages()
messages_with_header = {'total_messages' : len(messages_dict), 'messages' : messages_dict}
def get_all_messages_json(self, filters=None):
messages_dict = self.get_all_messages(filters)
messages_with_header = {'total_messages': len(messages_dict), 'messages': messages_dict}
return messages_with_header

def get_message_by_id(self, message_id):
Expand Down Expand Up @@ -114,7 +133,7 @@ def delete_message(self, message_id):
session.delete(message)
session.commit()
self.log(f"Deleted: {message_id}")
self.event_manager.freedata_message_db_change()
self.event_manager.freedata_message_db_change(message_id=message_id)
return {'status': 'success', 'message': f'Message {message_id} deleted'}
else:
return {'status': 'failure', 'message': 'Message not found'}
Expand Down Expand Up @@ -146,10 +165,9 @@ def update_message(self, message_id, update_data):
if 'priority' in update_data:
message.priority = update_data['priority']


session.commit()
self.log(f"Updated: {message_id}")
self.event_manager.freedata_message_db_change()
self.event_manager.freedata_message_db_change(message_id=message_id)
return {'status': 'success', 'message': f'Message {message_id} updated'}
else:
return {'status': 'failure', 'message': 'Message not found'}
Expand Down Expand Up @@ -209,21 +227,6 @@ def increment_message_attempts(self, message_id, session=None):
if own_session:
session.remove()

def mark_message_as_read(self, message_id):
session = self.get_thread_scoped_session()
try:
message = session.query(P2PMessage).filter_by(id=message_id).first()
if message:
message.is_read = True
session.commit()
self.log(f"Marked message {message_id} as read")
else:
self.log(f"Message with ID {message_id} not found")
except Exception as e:
session.rollback()
self.log(f"An error occurred while marking message {message_id} as read: {e}")
finally:
session.remove()

def set_message_to_queued_for_callsign(self, callsign):
session = self.get_thread_scoped_session()
Expand Down
39 changes: 30 additions & 9 deletions freedata_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,34 +270,54 @@ def get_post_radio():

@app.route('/freedata/messages', methods=['POST', 'GET'])
def get_post_freedata_message():
if request.method in ['GET']:
result = DatabaseManagerMessages(app.event_manager).get_all_messages_json()
if request.method == 'GET':
# Get filter parameters from the query string
filters = {
'id': request.args.get('id', default=None, type=str),
'callsign': request.args.get('callsign', default=None, type=str),
'origin_callsign': request.args.get('origin_callsign', default=None, type=str),
'via_callsign': request.args.get('via_callsign', default=None, type=str),
'destination_callsign': request.args.get('destination_callsign', default=None, type=str),
'direction': request.args.get('direction', default=None, type=str)
}

# Remove filters that are None
filters = {k: v for k, v in filters.items() if v is not None}

# Fetch filtered messages from the database
result = DatabaseManagerMessages(app.event_manager).get_all_messages_json(filters=filters)
return api_response(result)

elif request.method in ['POST']:
enqueue_tx_command(command_message_send.SendMessageCommand, request.json)
return api_response(request.json)
else:
api_abort('Error executing command...', 500)

@app.route('/freedata/messages/<string:message_id>', methods=['GET', 'POST', 'PATCH', 'DELETE'])
@app.route('/freedata/messages/<string:message_id>', methods=['GET', 'PATCH', 'DELETE'])
def handle_freedata_message(message_id):
if request.method == 'GET':
message = DatabaseManagerMessages(app.event_manager).get_message_by_id_json(message_id)
return message
elif request.method == 'POST':
result = DatabaseManagerMessages(app.event_manager).update_message(message_id, update_data={'status': 'queued'})
DatabaseManagerMessages(app.event_manager).increment_message_attempts(message_id)
return api_response(result)
elif request.method == 'PATCH':
# Fixme We need to adjust this
result = DatabaseManagerMessages(app.event_manager).mark_message_as_read(message_id)
result = DatabaseManagerMessages(app.event_manager).update_message(message_id, update_data=request.json)
return api_response(result)

elif request.method == 'DELETE':
result = DatabaseManagerMessages(app.event_manager).delete_message(message_id)
return api_response(result)
else:
api_abort('Error executing command...', 500)

@app.route('/freedata/messages/<string:message_id>/retransmit', methods=['PATCH'])
def retransmit_freedata_message(message_id):
if request.method == 'PATCH':
result = DatabaseManagerMessages(app.event_manager).update_message(message_id, update_data={'status': 'queued'})
DatabaseManagerMessages(app.event_manager).increment_message_attempts(message_id)
return api_response(result)
else:
api_abort('Error executing command...', 500)

@app.route('/freedata/messages/<string:message_id>/attachments', methods=['GET'])
def get_message_attachments(message_id):
attachments = DatabaseManagerAttachments(app.event_manager).get_attachments_by_message_id_json(message_id)
Expand Down Expand Up @@ -412,6 +432,7 @@ def main():
# initialize database default values

DatabaseManager(app.event_manager).initialize_default_values()
DatabaseManager(app.event_manager).database_repair_and_cleanup()
wsm.startThreads(app)

conf = app.config_manager.read()
Expand Down
13 changes: 0 additions & 13 deletions tests/test_message_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,9 @@ def testIncrementAttempts(self):
received_message_dict = MessageP2P.to_dict(received_message)
message_id = self.database_manager.add_message(received_message_dict,statistics={},)
self.database_manager.increment_message_attempts(message_id)


result = self.database_manager.get_message_by_id(message_id)
self.assertEqual(result["attempt"], 1)

def testMarkAsRead(self):
apiParams = {'destination': 'DJ2LS-3', 'body': 'Hello World!', 'attachments': []}
message = MessageP2P.from_api_params(self.mycall, apiParams)
payload = message.to_payload()
received_message = MessageP2P.from_payload(payload)
received_message_dict = MessageP2P.to_dict(received_message)
message_id = self.database_manager.add_message(received_message_dict, statistics={},is_read=False)
self.database_manager.mark_message_as_read(message_id)

result = self.database_manager.get_message_by_id(message_id)
self.assertEqual(result["is_read"], True)

if __name__ == '__main__':
unittest.main()