-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathservice_functions.py
8287 lines (7884 loc) · 514 KB
/
service_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import base64
import decimal
import hashlib
import ipaddress
import json
import os
import io
import platform
import statistics
import time
import csv
import uuid
import socket
import subprocess
import random
import re
import sys
import traceback
import html
import tempfile
import warnings
import pickle
import shutil
import sqlite3
import pytz
import PyPDF2
from typing import Any
from collections import defaultdict
from collections.abc import Iterable
from functools import wraps
from cachetools import TTLCache
from diskcache import Cache
from pathlib import Path
from urllib.parse import quote_plus, unquote_plus
from datetime import datetime, timedelta, date, timezone
import pandas as pd
import httpx
from httpx import Timeout
from urllib.parse import urlparse
from logger_config import logger
import zstandard as zstd
from sqlalchemy.exc import OperationalError, InvalidRequestError
from typing import List, Tuple, Dict, Union, Optional
from decouple import Config as DecoupleConfig, RepositoryEnv
from magika import Magika
import tiktoken
import anthropic
from openai import AsyncOpenAI
from groq import AsyncGroq
from mistralai import Mistral
from cryptography.fernet import Fernet
from fuzzywuzzy import process
from transformers import AutoTokenizer, GPT2TokenizerFast, WhisperTokenizer
import database_code as db_code
from sqlmodel import select, delete, func, SQLModel
from sqlalchemy.exc import IntegrityError
from mutagen import File as MutagenFile
from PIL import Image
import libpastelid
ssh_tunnel_process = None # Add this at top of file with other globals
tracking_period_start = datetime.utcnow()
rpc_call_stats = defaultdict(lambda: {
"count": 0,
"cumulative_time": 0.0,
"average_time": 0.0,
"success_count": 0,
"total_response_size": 0,
"average_response_size": 0.0,
"timeout_errors": 0,
"connection_errors": 0,
"other_errors": 0
})
pastel_keys_dir = os.path.expanduser("/home/ubuntu/.pastel/pastelkeys")
pastel_signer = libpastelid.PastelSigner(pastel_keys_dir)
encryption_key = None
magika = Magika()
SENSITIVE_ENV_FIELDS = ["LOCAL_PASTEL_ID_PASSPHRASE", "SWISS_ARMY_LLAMA_SECURITY_TOKEN", "OPENAI_API_KEY", "CLAUDE3_API_KEY", "GROQ_API_KEY", "MISTRAL_API_KEY", "STABILITY_API_KEY", "OPENROUTER_API_KEY", "DEEPSEEK_API_KEY"]
LOCAL_PASTEL_ID_PASSPHRASE = None
SWISS_ARMY_LLAMA_SECURITY_TOKEN = None
OPENAI_API_KEY = None
CLAUDE3_API_KEY = None
GROQ_API_KEY = None
MISTRAL_API_KEY = None
STABILITY_API_KEY = None
OPENROUTER_API_KEY = None
DEEPSEEK_API_KEY = None
AVAILABLE_TOOLS: Dict[str, callable] = {} # Our global dictionary of available tools for use with OpenAI models
USER_FUNCTION_SCHEMAS: Dict[str, dict] = {} # new global or module-level dictionary
try:
with open("model_menu.json", "r", encoding="utf-8") as f:
MODEL_MENU_DATA = json.load(f)
except FileNotFoundError:
MODEL_MENU_DATA = {}
logger.error("Could not load model_menu.json. Please ensure it is present in the current directory.")
def get_local_ip():
hostname = socket.gethostname()
return socket.gethostbyname(hostname)
def get_env_value(key):
current_dir = os.path.dirname(os.path.abspath(__file__))
env_file_path = os.path.join(current_dir, '.env')
try:
with open(env_file_path, 'r') as env_file:
for line in env_file:
if line.startswith(key + '='):
return line.split('=', 1)[1].strip() # Split on the first '=' to allow for '=' in the value
except FileNotFoundError:
print(f"Error: .env file at {env_file_path} not found.")
print(f"Warning: Key '{key}' not found in .env file.")
return None
def generate_or_load_encryption_key_sync():
key_file_path = os.path.expanduser('~/env_encryption_key_for_supernode_inference_app')
key = None
if os.path.exists(key_file_path): # Check if key file exists and load it
with open(key_file_path, 'rb') as key_file:
key = key_file.read()
try:
Fernet(key) # Validate the key
loaded_or_generated = "loaded"
except ValueError:
key = None
if key is None: # If key is invalid or doesn't exist, generate a new one
logger.info("Invalid or no encryption key found. Generating a new one.")
loaded_or_generated = "generated"
key = Fernet.generate_key()
with open(key_file_path, 'wb') as key_file:
key_file.write(key)
print(f"Generated new encryption key for sensitive env fields: {key}")
encrypt_sensitive_fields(key) # Encrypt sensitive fields if generating key for the first time
logger.info(f"Encryption key {loaded_or_generated} successfully.")
return key
def encrypt_sensitive_fields(key):
current_dir = os.path.dirname(os.path.abspath(__file__))
env_file_path = os.path.join(current_dir, '.env')
cipher_suite = Fernet(key)
with open(env_file_path, 'r') as file: # Load existing .env file
lines = file.readlines()
updated_lines = [] # Encrypt and update sensitive fields
for line in lines:
if any(field in line for field in SENSITIVE_ENV_FIELDS):
for field in SENSITIVE_ENV_FIELDS:
if line.startswith(field):
value = line.strip().split('=')[1]
encrypted_data = cipher_suite.encrypt(value.encode()).decode()
url_encoded_encrypted_data = quote_plus(encrypted_data)
line = f"{field}={url_encoded_encrypted_data}\n"
print(f"Encrypted {field}: {url_encoded_encrypted_data}")
break
updated_lines.append(line)
with open(env_file_path, 'w') as file: # Write the updated lines back to the .env file
file.writelines(updated_lines)
logger.info(f"Updated {len(SENSITIVE_ENV_FIELDS)} sensitive fields in .env file with encrypted values!")
def decrypt_sensitive_data(url_encoded_encrypted_data, encryption_key):
if url_encoded_encrypted_data is None:
raise ValueError("No encrypted data provided for decryption.")
try:
cipher_suite = Fernet(encryption_key)
encrypted_data = unquote_plus(url_encoded_encrypted_data) # URL-decode first
decrypted_data = cipher_suite.decrypt(encrypted_data.encode()).decode() # Ensure this is a bytes-like object
return decrypted_data
except Exception as e:
logger.error(f"Failed to decrypt data: {e}")
raise
def encrypt_sensitive_data(data, encryption_key):
cipher_suite = Fernet(encryption_key)
encrypted_data = cipher_suite.encrypt(data.encode()).decode()
url_encoded_encrypted_data = quote_plus(encrypted_data)
return url_encoded_encrypted_data
def decrypt_sensitive_fields():
global LOCAL_PASTEL_ID_PASSPHRASE, SWISS_ARMY_LLAMA_SECURITY_TOKEN, OPENAI_API_KEY, CLAUDE3_API_KEY, GROQ_API_KEY, MISTRAL_API_KEY, STABILITY_API_KEY, OPENROUTER_API_KEY, DEEPSEEK_API_KEY, encryption_key
LOCAL_PASTEL_ID_PASSPHRASE = decrypt_sensitive_data(get_env_value("LOCAL_PASTEL_ID_PASSPHRASE"), encryption_key)
SWISS_ARMY_LLAMA_SECURITY_TOKEN = decrypt_sensitive_data(get_env_value("SWISS_ARMY_LLAMA_SECURITY_TOKEN"), encryption_key)
OPENAI_API_KEY = decrypt_sensitive_data(get_env_value("OPENAI_API_KEY"), encryption_key)
CLAUDE3_API_KEY = decrypt_sensitive_data(get_env_value("CLAUDE3_API_KEY"), encryption_key)
GROQ_API_KEY = decrypt_sensitive_data(get_env_value("GROQ_API_KEY"), encryption_key)
MISTRAL_API_KEY = decrypt_sensitive_data(get_env_value("MISTRAL_API_KEY"), encryption_key)
STABILITY_API_KEY = decrypt_sensitive_data(get_env_value("STABILITY_API_KEY"), encryption_key)
OPENROUTER_API_KEY = decrypt_sensitive_data(get_env_value("OPENROUTER_API_KEY"), encryption_key)
DEEPSEEK_API_KEY = decrypt_sensitive_data(get_env_value("DEEPSEEK_API_KEY"), encryption_key)
number_of_cpus = os.cpu_count()
my_os = platform.system()
loop = asyncio.get_event_loop()
warnings.filterwarnings('ignore')
local_ip = get_local_ip()
benchmark_results_cache = [] # Global cache to store benchmark results in memory
performance_data_df = pd.DataFrame([{
'IP Address': local_ip,
'Performance Ratio': 1.0, # Default ratio
'Actual Score': 1.0,
'Seconds Since Last Updated': 0
}])
performance_data_history = {}
local_benchmark_csv_file_path = Path('local_sn_micro_benchmark_results.csv')
pickle_file_path = Path('performance_data_history.pkl')
use_libpastelid_for_pastelid_sign_verify = 1
# Configuration for diskcache
CACHE_DIR = './local_credit_pack_cache'
CREDIT_BALANCE_CACHE_INVALIDATION_PERIOD_IN_SECONDS = 5 * 60 # 5 minutes
# Initialize the cache
try:
credit_pack_cache = Cache(CACHE_DIR)
except Exception as e:
# Check if it's the "database disk image is malformed" error
if 'database disk image is malformed' in str(e).lower():
logger.error("Detected 'database disk image is malformed'. Removing local_credit_pack_cache folder...")
# Remove the corrupted cache directory
local_cache_dir = os.path.join(os.path.dirname(__file__), "local_credit_pack_cache")
shutil.rmtree(local_cache_dir, ignore_errors=True)
# Retry after removing cache
credit_pack_cache = Cache(CACHE_DIR)
else:
# If it's some other error, just raise it
raise
use_purge_all_caches = 0
if use_purge_all_caches:
logger.info("Purging all caches...")
credit_pack_cache.clear()
config = DecoupleConfig(RepositoryEnv('.env'))
TEMP_OVERRIDE_LOCALHOST_ONLY = config.get("TEMP_OVERRIDE_LOCALHOST_ONLY", default=0, cast=int)
NUMBER_OF_DAYS_BEFORE_MESSAGES_ARE_CONSIDERED_OBSOLETE = config.get("NUMBER_OF_DAYS_BEFORE_MESSAGES_ARE_CONSIDERED_OBSOLETE", default=3, cast=int)
GITHUB_MODEL_MENU_URL = config.get("GITHUB_MODEL_MENU_URL")
CHALLENGE_EXPIRATION_TIME_IN_SECONDS = config.get("CHALLENGE_EXPIRATION_TIME_IN_SECONDS", default=300, cast=int)
SWISS_ARMY_LLAMA_PORT = config.get("SWISS_ARMY_LLAMA_PORT", default=8089, cast=int)
USE_REMOTE_SWISS_ARMY_LLAMA_IF_AVAILABLE = config.get("USE_REMOTE_SWISS_ARMY_LLAMA_IF_AVAILABLE", default=0, cast=int)
REMOTE_SWISS_ARMY_LLAMA_INSTANCE_SSH_KEY_PATH = config.get("REMOTE_SWISS_ARMY_LLAMA_INSTANCE_SSH_KEY_PATH", default="/home/ubuntu/vastai_privkey")
REMOTE_SWISS_ARMY_LLAMA_INSTANCE_IP_ADDRESSES = config.get("REMOTE_SWISS_ARMY_LLAMA_INSTANCE_IP_ADDRESSES", default="172.219.157.164").split(",")
REMOTE_SWISS_ARMY_LLAMA_INSTANCE_PORTS = [int(port.strip()) for port in config.get("REMOTE_SWISS_ARMY_LLAMA_INSTANCE_PORTS", default="9188").split(",")]
REMOTE_SWISS_ARMY_LLAMA_MAPPED_PORT = config.get("REMOTE_SWISS_ARMY_LLAMA_MAPPED_PORT", default=8087, cast=int)
REMOTE_SWISS_ARMY_LLAMA_EXPOSED_PORT = config.get("REMOTE_SWISS_ARMY_LLAMA_EXPOSED_PORT", default=8089, cast=int)
CREDIT_COST_MULTIPLIER_FACTOR = config.get("CREDIT_COST_MULTIPLIER_FACTOR", default=0.1, cast=float)
MESSAGING_TIMEOUT_IN_SECONDS = config.get("MESSAGING_TIMEOUT_IN_SECONDS", default=60, cast=int)
API_KEY_TESTS_FILE = "api_key_tests.json"
API_KEY_TEST_VALIDITY_HOURS = config.get("API_KEY_TEST_VALIDITY_HOURS", default=72, cast=int)
TARGET_VALUE_PER_CREDIT_IN_USD = config.get("TARGET_VALUE_PER_CREDIT_IN_USD", default=0.1, cast=float)
TARGET_PROFIT_MARGIN = config.get("TARGET_PROFIT_MARGIN", default=0.1, cast=float)
MINIMUM_COST_IN_CREDITS = config.get("MINIMUM_COST_IN_CREDITS", default=0.1, cast=float)
CREDIT_USAGE_TO_TRACKING_AMOUNT_MULTIPLIER = config.get("CREDIT_USAGE_TO_TRACKING_AMOUNT_MULTIPLIER", default=10, cast=int) # Since we always round inference credits to the nearest 0.1, this gives us enough resolution using Patoshis
MAXIMUM_NUMBER_OF_PASTEL_BLOCKS_FOR_USER_TO_SEND_BURN_AMOUNT_FOR_CREDIT_TICKET = config.get("MAXIMUM_NUMBER_OF_PASTEL_BLOCKS_FOR_USER_TO_SEND_BURN_AMOUNT_FOR_CREDIT_TICKET", default=50, cast=int)
MAXIMUM_LOCAL_CREDIT_PRICE_DIFFERENCE_TO_ACCEPT_CREDIT_PRICING = config.get("MAXIMUM_LOCAL_CREDIT_PRICE_DIFFERENCE_TO_ACCEPT_CREDIT_PRICING", default=0.1, cast=float)
MAXIMUM_LOCAL_UTC_TIMESTAMP_DIFFERENCE_IN_SECONDS = config.get("MAXIMUM_LOCAL_UTC_TIMESTAMP_DIFFERENCE_IN_SECONDS", default=15.0, cast=float)
MAXIMUM_LOCAL_PASTEL_BLOCK_HEIGHT_DIFFERENCE_IN_BLOCKS = config.get("MAXIMUM_LOCAL_PASTEL_BLOCK_HEIGHT_DIFFERENCE_IN_BLOCKS", default=1, cast=int)
MINIMUM_NUMBER_OF_PASTEL_BLOCKS_BEFORE_TICKET_STORAGE_RETRY_ALLOWED = config.get("MINIMUM_NUMBER_OF_PASTEL_BLOCKS_BEFORE_TICKET_STORAGE_RETRY_ALLOWED", default=10, cast=int)
MINIMUM_CONFIRMATION_BLOCKS_FOR_CREDIT_PACK_BURN_TRANSACTION = config.get("MINIMUM_CONFIRMATION_BLOCKS_FOR_CREDIT_PACK_BURN_TRANSACTION", default=3, cast=int)
MINIMUM_NUMBER_OF_POTENTIALLY_AGREEING_SUPERNODES = config.get("MINIMUM_NUMBER_OF_POTENTIALLY_AGREEING_SUPERNODES", default=10, cast=int)
MAXIMUM_NUMBER_OF_CONCURRENT_RPC_REQUESTS = config.get("MAXIMUM_NUMBER_OF_CONCURRENT_RPC_REQUESTS", default=30, cast=int)
MICRO_BENCHMARK_PERFORMANCE_RATIO_THRESHOLD = config.get("MICRO_BENCHMARK_PERFORMANCE_RATIO_THRESHOLD", default=0.55, cast=float)
INDIVIDUAL_SUPERNODE_PRICE_AGREEMENT_REQUEST_TIMEOUT_PERIOD_IN_SECONDS = config.get("INDIVIDUAL_SUPERNODE_PRICE_AGREEMENT_REQUEST_TIMEOUT_PERIOD_IN_SECONDS", default=12, cast=int)
INDIVIDUAL_SUPERNODE_MODEL_MENU_REQUEST_TIMEOUT_PERIOD_IN_SECONDS = config.get("INDIVIDUAL_SUPERNODE_MODEL_MENU_REQUEST_TIMEOUT_PERIOD_IN_SECONDS", default=3, cast=int)
BURN_TRANSACTION_MAXIMUM_AGE_IN_DAYS = config.get("BURN_TRANSACTION_MAXIMUM_AGE_IN_DAYS", default=3, cast=float)
SUPERNODE_CREDIT_PRICE_AGREEMENT_QUORUM_PERCENTAGE = config.get("SUPERNODE_CREDIT_PRICE_AGREEMENT_QUORUM_PERCENTAGE", default=0.51, cast=float)
SUPERNODE_CREDIT_PRICE_AGREEMENT_MAJORITY_PERCENTAGE = config.get("SUPERNODE_CREDIT_PRICE_AGREEMENT_MAJORITY_PERCENTAGE", default=0.65, cast=float)
MINIMUM_CREDITS_PER_CREDIT_PACK = config.get("MINIMUM_CREDITS_PER_CREDIT_PACK", default=10, cast=int)
MAXIMUM_CREDITS_PER_CREDIT_PACK = config.get("MAXIMUM_CREDITS_PER_CREDIT_PACK", default=1000000, cast=int)
SUPERNODE_DATA_CACHE_EVICTION_TIME_IN_MINUTES = config.get("SUPERNODE_DATA_CACHE_EVICTION_TIME_IN_MINUTES", default=60, cast=int)
SKIP_BURN_TRANSACTION_BLOCK_CONFIRMATION_CHECK = 1
UVICORN_PORT = config.get("UVICORN_PORT", default=7123, cast=int)
COIN = 100000 # patoshis in 1 PSL
SUPERNODE_DATA_CACHE = TTLCache(maxsize=1, ttl=SUPERNODE_DATA_CACHE_EVICTION_TIME_IN_MINUTES * 60) # Define the cache with a TTL (time to live) in seconds
challenge_store = {}
file_store = {} # In-memory store for files with expiration times
def async_disk_cached(cache, ttl=None):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create a unique cache key based on the function name and arguments
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
try:
cached_result = cache.get(cache_key, default=None, expire_time=True)
except sqlite3.DatabaseError as e:
if 'database disk image is malformed' in str(e).lower():
logger.error("Cache database is malformed. Deleting cache directory...")
try:
# Remove the corrupted cache directory
cache_dir = cache.directory if hasattr(cache, 'directory') else 'path_to_default_cache_directory'
shutil.rmtree(cache_dir, ignore_errors=True)
logger.info(f"Deleted cache directory: {cache_dir}")
# Recreate the cache directory
os.makedirs(cache_dir, exist_ok=True)
logger.info(f"Recreated cache directory: {cache_dir}")
except Exception as delete_error:
logger.error(f"Failed to delete cache directory {cache_dir}: {delete_error}")
raise
# Retry retrieving from cache after clearing
try:
cached_result = cache.get(cache_key, default=None, expire_time=True)
except Exception as retry_error:
logger.error(f"Failed to retrieve cache after clearing: {retry_error}")
cached_result = None
else:
# If it's a different SQLite error, re-raise it
logger.error(f"Unhandled SQLite DatabaseError: {e}")
raise
# Check if cached_result is a valid, non-None value
if cached_result is not None and not (isinstance(cached_result, tuple) and all(v is None for v in cached_result)):
if isinstance(cached_result, tuple) and len(cached_result) == 2:
value, expire_time = cached_result
if value is not None and (ttl is None or expire_time is None or expire_time > 0):
return value
elif cached_result is not None:
return cached_result
try:
value = await func(*args, **kwargs)
except Exception as e:
logger.error(f"Exception in {func.__name__}: {str(e)}")
logger.error(f"Exception type: {type(e).__name__}")
logger.error(f"Exception details: {traceback.format_exc()}")
raise
if value is not None:
try:
cache.set(cache_key, value, expire=ttl)
except sqlite3.DatabaseError as e:
if 'database disk image is malformed' in str(e).lower():
logger.error("Cache database is malformed during set operation. Deleting cache directory...")
try:
cache_dir = cache.directory if hasattr(cache, 'directory') else 'path_to_default_cache_directory'
shutil.rmtree(cache_dir, ignore_errors=True)
logger.info(f"Deleted cache directory: {cache_dir}")
os.makedirs(cache_dir, exist_ok=True)
logger.info(f"Recreated cache directory: {cache_dir}")
except Exception as delete_error:
logger.error(f"Failed to delete cache directory {cache_dir}: {delete_error}")
raise
# Optionally, you can retry setting the cache here
else:
logger.error(f"Unhandled SQLite DatabaseError during set: {e}")
raise
except Exception as set_error:
logger.error(f"Failed to set cache for {func.__name__}: {set_error}")
else:
logger.warning(f"Not caching None value for {func.__name__}")
return value
return wrapper
return decorator
# Initialize PastelSigner
pastel_keys_dir = os.path.expanduser("~/.pastel/pastelkeys")
pastel_signer = libpastelid.PastelSigner(pastel_keys_dir)
def parse_timestamp(timestamp_str):
try:
# Attempt to parse with fractional seconds
return pd.to_datetime(timestamp_str, format='%Y-%m-%dT%H:%M:%S.%f')
except ValueError:
# Fall back to parsing without fractional seconds
return pd.to_datetime(timestamp_str, format='%Y-%m-%dT%H:%M:%S')
def parse_and_format(value):
try:
# Check if the JSON string is already formatted
if isinstance(value, str) and "\n" in value:
return value
# Unescape the JSON string if it's a string
if isinstance(value, str):
unescaped_value = json.loads(json.dumps(value))
parsed_value = json.loads(unescaped_value)
else:
parsed_value = value
return json.dumps(parsed_value, indent=4)
except (json.JSONDecodeError, TypeError):
return value
def normalize_data(data):
if isinstance(data, dict):
return {key: normalize_data(value) for key, value in data.items()}
elif isinstance(data, list):
return [normalize_data(item) for item in data]
elif isinstance(data, uuid.UUID):
return str(data)
elif isinstance(data, datetime):
if data.tzinfo is None:
# Make the datetime offset-aware with UTC timezone
return data.replace(tzinfo=pytz.UTC)
else:
# Convert to UTC timezone
return data.astimezone(pytz.UTC)
else:
return data
def format_list(input_list):
def json_serialize(item):
if isinstance(item, uuid.UUID):
return json.dumps(str(item), indent=4)
elif isinstance(item, dict):
return json.dumps(pretty_json_func(item), indent=4)
elif isinstance(item, list):
return format_list(item)
else:
return json.dumps(item, indent=4)
formatted_list = "[\n" + ",\n".join(" " + json_serialize(item).replace("\n", "\n ") for item in input_list) + "\n]"
return formatted_list
def pretty_json_func(data):
if isinstance(data, SQLModel):
data = data.dict() # Convert SQLModel instance to dictionary
if isinstance(data, dict):
formatted_data = {}
for key, value in data.items():
if isinstance(value, uuid.UUID): # Convert UUIDs to string
formatted_data[key] = str(value)
elif isinstance(value, dict): # Recursively handle dictionary values
formatted_data[key] = pretty_json_func(value)
elif isinstance(value, list): # Special handling for lists
formatted_data[key] = format_list(value)
elif key.endswith("_json"): # Handle keys that end with '_json'
formatted_data[key] = parse_and_format(value)
else: # Handle other types of values
formatted_data[key] = value
return json.dumps(formatted_data, indent=4)
elif isinstance(data, list): # Top-level list handling
return format_list(data)
elif isinstance(data, str): # Handle string type data separately
return parse_and_format(data)
else:
return data # Return data as is if not a dictionary or string
def abbreviated_pretty_json_func(data):
max_payload_length_in_characters = 10000
formatted_payload = pretty_json_func(data)
if len(formatted_payload) > max_payload_length_in_characters:
abbreviated_payload = formatted_payload[:max_payload_length_in_characters] + "..."
closing_brackets = "]" * (formatted_payload.count("[") - formatted_payload[:max_payload_length_in_characters].count("["))
closing_brackets += "}" * (formatted_payload.count("{") - formatted_payload[:max_payload_length_in_characters].count("{"))
abbreviated_payload += closing_brackets
formatted_payload = abbreviated_payload
return formatted_payload
def log_action_with_payload(action_string, payload_name, json_payload):
formatted_payload = abbreviated_pretty_json_func(json_payload)
logger.info(f"Now {action_string} {payload_name} with payload:\n{formatted_payload}")
def get_local_rpc_settings_func(directory_with_pastel_conf=os.path.expanduser("~/.pastel/")):
with open(os.path.join(directory_with_pastel_conf, "pastel.conf"), 'r') as f:
lines = f.readlines()
other_flags = {}
rpchost = '127.0.0.1'
rpcport = '19932'
rpcuser = None
rpcpassword = None
for line in lines:
line = line.strip()
if not line or line.startswith('#'): # Ignore blank lines and comments
continue
if '=' in line:
key, value = line.split('=', 1) # Split only on the first '='
key = key.strip()
value = value.strip()
if key == 'rpcport':
rpcport = value
elif key == 'rpcuser':
rpcuser = value
elif key == 'rpcpassword':
rpcpassword = value
elif key == 'rpchost':
rpchost = value
else:
other_flags[key] = value
return rpchost, rpcport, rpcuser, rpcpassword, other_flags
def get_network_info(rpc_port):
if rpc_port == '9932':
network = 'mainnet'
burn_address = 'PtpasteLBurnAddressXXXXXXXXXXbJ5ndd'
elif rpc_port == '19932':
network = 'testnet'
burn_address = 'tPpasteLBurnAddressXXXXXXXXXXX3wy7u'
elif rpc_port == '29932':
network = 'devnet'
burn_address = '44oUgmZSL997veFEQDq569wv5tsT6KXf9QY7'
else:
raise ValueError(f"Unknown RPC port: {rpc_port}")
return network, burn_address
def required_collateral(network):
if network == 'mainnet':
return 5000000 # 5 million PSL for mainnet
else:
return 1000000 # 1 million PSL for testnet/devnet
def write_rpc_settings_to_env_file_func(rpc_host, rpc_port, rpc_user, rpc_password, other_flags):
with open('.env', 'w') as f:
f.write(f"RPC_HOST={rpc_host}\n")
f.write(f"RPC_PORT={rpc_port}\n")
f.write(f"RPC_USER={rpc_user}\n")
f.write(f"RPC_PASSWORD={rpc_password}\n")
for current_flag in other_flags:
current_value = other_flags[current_flag]
try:
f.write(f"{current_flag}={current_value}\n")
except Exception as e:
logger.error(f"Error writing to .env file: {e}")
pass
return
def is_base64_encoded(data):
if not isinstance(data, str):
return False
if len(data) % 4 != 0:
return False
base64_pattern = re.compile(r'^[A-Za-z0-9+/]+={0,2}$')
if not base64_pattern.match(data):
return False
try:
base64.b64decode(data, validate=True)
return True
except Exception:
return False
async def check_tunnel_health():
global ssh_tunnel_process
while True:
await asyncio.sleep(60) # Check every minute
if USE_REMOTE_SWISS_ARMY_LLAMA_IF_AVAILABLE and ssh_tunnel_process is not None:
if ssh_tunnel_process.returncode is not None:
logger.warning("SSH tunnel process died, attempting to reestablish...")
kill_open_ssh_tunnels(REMOTE_SWISS_ARMY_LLAMA_MAPPED_PORT)
await establish_ssh_tunnel()
async def cleanup_ssh_tunnel():
global ssh_tunnel_process
if ssh_tunnel_process is not None:
try:
ssh_tunnel_process.terminate()
await ssh_tunnel_process.wait()
except Exception as e:
logger.error(f"Error cleaning up SSH tunnel: {e}")
def kill_open_ssh_tunnels(local_port):
try:
# First try to terminate the global process if it exists
global ssh_tunnel_process
if ssh_tunnel_process and ssh_tunnel_process.returncode is None:
try:
ssh_tunnel_process.terminate()
except Exception as e:
logger.error(f"Error terminating existing SSH process: {e}")
ssh_tunnel_process = None
# Find processes listening on the specified port
lsof_command = [
"lsof", "-i", f"TCP:{local_port}", "-t" # -t outputs only the PID
]
result = subprocess.run(lsof_command, capture_output=True, text=True)
if result.stdout:
pids = result.stdout.strip().split('\n')
for pid in pids:
try:
subprocess.run(["kill", "-15", pid]) # Try SIGTERM first
logger.info(f"Sent SIGTERM to process with PID: {pid}")
time.sleep(1) # Give it a second to terminate gracefully
# Check if process still exists and force kill if necessary
if subprocess.run(["ps", "-p", pid], capture_output=True).returncode == 0:
subprocess.run(["kill", "-9", pid])
logger.info(f"Sent SIGKILL to persistent process with PID: {pid}")
except Exception as e:
logger.error(f"Error killing process {pid}: {e}")
# Also kill any ssh processes with the specific port forward
ps_command = ["ps", "aux"]
result = subprocess.run(ps_command, capture_output=True, text=True)
if result.stdout:
for line in result.stdout.split('\n'):
if f':{local_port}:' in line and 'ssh' in line:
try:
pid = line.split()[1]
subprocess.run(["kill", "-15", pid])
time.sleep(1)
if subprocess.run(["ps", "-p", pid], capture_output=True).returncode == 0:
subprocess.run(["kill", "-9", pid])
except Exception as e:
logger.error(f"Error killing SSH process: {e}")
except Exception as e:
logger.error(f"Error while killing SSH tunnels: {e}")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Error details: {traceback.format_exc()}")
def get_remote_swiss_army_llama_instances() -> List[Tuple[str, int]]:
ip_addresses = config.get("REMOTE_SWISS_ARMY_LLAMA_INSTANCE_IP_ADDRESSES", "").split(",")
ports = config.get("REMOTE_SWISS_ARMY_LLAMA_INSTANCE_PORTS", "").split(",")
if len(ip_addresses) != len(ports):
logger.error("Mismatch between number of IP addresses and ports for remote Swiss Army Llama instances")
return []
return list(zip(ip_addresses, [int(port) for port in ports]))
async def establish_ssh_tunnel():
global ssh_tunnel_process
if USE_REMOTE_SWISS_ARMY_LLAMA_IF_AVAILABLE:
instances = get_remote_swiss_army_llama_instances()
random.shuffle(instances) # Randomize the order of instances
# Kill any open tunnels once, before starting new ones
kill_open_ssh_tunnels(REMOTE_SWISS_ARMY_LLAMA_MAPPED_PORT)
for ip_address, port in instances:
key_path = REMOTE_SWISS_ARMY_LLAMA_INSTANCE_SSH_KEY_PATH
if not os.access(key_path, os.R_OK):
raise PermissionError(f"SSH key file at {key_path} is not readable.")
current_permissions = os.stat(key_path).st_mode & 0o777
if current_permissions != 0o600:
os.chmod(key_path, 0o600)
logger.info("Permissions for SSH key file set to 600.")
try:
cmd = [
'ssh',
'-i', key_path,
'-p', str(port),
'-L', f'{REMOTE_SWISS_ARMY_LLAMA_MAPPED_PORT}:localhost:{REMOTE_SWISS_ARMY_LLAMA_EXPOSED_PORT}',
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'ExitOnForwardFailure=yes',
'-N',
f'root@{ip_address}'
]
# Start the SSH process
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Wait a bit to ensure the tunnel is established
await asyncio.sleep(2)
# Check if process is still running
if process.returncode is None:
logger.info(f"SSH tunnel established to {ip_address}:{port}")
ssh_tunnel_process = process
return
else:
stdout, stderr = await process.communicate()
logger.error(f"SSH process failed. stdout: {stdout.decode()}, stderr: {stderr.decode()}")
except Exception as e:
logger.error(f"Error establishing SSH tunnel to {ip_address}:{port}: {e}")
if process and process.returncode is None:
try:
process.terminate()
await process.wait()
except Exception as cleanup_error:
logger.error(f"Error terminating failed SSH process: {cleanup_error}")
logger.error("Failed to establish SSH tunnel to any remote Swiss Army Llama instance")
else:
logger.info("Remote Swiss Army Llama is not enabled. Using local instance.")
def get_audio_length(audio_input) -> float:
if isinstance(audio_input, bytes):
audio_file = io.BytesIO(audio_input)
audio = MutagenFile(audio_file)
elif isinstance(audio_input, str):
audio = MutagenFile(audio_input)
else:
raise ValueError("audio_input must be either bytes or a file path string.")
if audio is None or not hasattr(audio.info, 'length'):
raise ValueError("Could not determine the length of the audio file.")
return audio.info.length
def convert_uuids_to_strings(data):
if isinstance(data, dict):
return {key: convert_uuids_to_strings(value) for key, value in data.items()}
elif isinstance(data, list):
return [convert_uuids_to_strings(item) for item in data]
elif isinstance(data, uuid.UUID):
return str(data)
else:
return data
def compute_sha3_256_hexdigest(input_str: str):
"""Compute the SHA3-256 hash of the input string and return the hexadecimal digest."""
return hashlib.sha3_256(input_str.encode('utf-8')).hexdigest()
def compute_sha3_256_hexdigest_of_file(file_data: bytes):
return hashlib.sha3_256(file_data).hexdigest()
def compute_function_string_hash(fn_code: str) -> str:
"""SHA3-256 hash of the code, for DB lookups."""
return hashlib.sha3_256(fn_code.encode("utf-8")).hexdigest()
def remove_file(path: str):
if os.path.exists(path):
os.remove(path)
if path in file_store:
del file_store[path]
async def save_file(file_content: bytes, filename: str):
file_location = os.path.join(tempfile.gettempdir(), filename)
with open(file_location, "wb") as buffer:
buffer.write(file_content)
file_hash = compute_sha3_256_hexdigest_of_file(file_content)
file_size = os.path.getsize(file_location) # Calculate file size
expire_at = datetime.utcnow() + timedelta(hours=24) # Set expiration time (24 hours)
file_store[file_location] = expire_at
return file_location, file_hash, file_size
async def upload_and_get_file_metadata(file_content: bytes, file_prefix: str = "document") -> Dict:
file_name = f"{file_prefix}_{compute_sha3_256_hexdigest_of_file(file_content)[:8]}.{magika.identify_bytes(file_content).output.ct_label}"
file_location, file_hash, file_size = await save_file(file_content, file_name)
external_ip = get_external_ip_func()
file_url = f"http://{external_ip}:{UVICORN_PORT}/download/{file_name}"
return {
"file_location": file_location,
"file_hash": file_hash,
"file_size": file_size,
"file_url": file_url
}
async def calculate_xor_distance(pastelid1: str, pastelid2: str) -> int:
hash1 = compute_sha3_256_hexdigest(pastelid1)
hash2 = compute_sha3_256_hexdigest(pastelid2)
xor_result = int(hash1, 16) ^ int(hash2, 16)
return xor_result
async def get_supernode_url_from_pastelid_func(pastelid: str, supernode_list_df: pd.DataFrame) -> str:
supernode_row = supernode_list_df[supernode_list_df['extKey'] == pastelid]
if not supernode_row.empty:
supernode_ipaddress_port = supernode_row['ipaddress:port'].values[0]
ipaddress = supernode_ipaddress_port.split(':')[0]
supernode_url = f"http://{ipaddress}:7123"
return supernode_url
else:
raise ValueError(f"Supernode with PastelID {pastelid} not found in the supernode list")
async def get_closest_supernode_pastelid_from_list(local_pastelid: str, supernode_pastelids: List[str]) -> str:
xor_distances = [(supernode_pastelid, await calculate_xor_distance(local_pastelid, supernode_pastelid)) for supernode_pastelid in supernode_pastelids]
closest_supernode = min(xor_distances, key=lambda x: x[1])
return closest_supernode[0]
async def get_closest_supernode_to_pastelid_url(input_pastelid: str, supernode_list_df: pd.DataFrame) -> Tuple[Optional[str], Optional[str]]:
if not supernode_list_df.empty:
list_of_supernode_pastelids = supernode_list_df['extKey'].tolist()
closest_supernode_pastelid = await get_closest_supernode_pastelid_from_list(input_pastelid, list_of_supernode_pastelids)
supernode_url = await get_supernode_url_from_pastelid_func(closest_supernode_pastelid, supernode_list_df)
return supernode_url, closest_supernode_pastelid
return None, None
async def get_n_closest_supernodes_to_pastelid_urls(n: int, input_pastelid: str, supernode_list_df: pd.DataFrame) -> List[Tuple[str, str]]:
if not supernode_list_df.empty:
list_of_supernode_pastelids = supernode_list_df['extKey'].tolist()
xor_distances = [(supernode_pastelid, await calculate_xor_distance(input_pastelid, supernode_pastelid)) for supernode_pastelid in list_of_supernode_pastelids]
sorted_xor_distances = sorted(xor_distances, key=lambda x: x[1])
closest_supernodes = sorted_xor_distances[:n]
supernode_urls_and_pastelids = [(await get_supernode_url_from_pastelid_func(pastelid, supernode_list_df), pastelid) for pastelid, _ in closest_supernodes]
return supernode_urls_and_pastelids
return []
class JSONRPCException(Exception):
def __init__(self, rpc_error):
parent_args = []
try:
parent_args.append(rpc_error['message'])
except Exception as e:
logger.error(f"Error occurred in JSONRPCException: {e}")
pass
Exception.__init__(self, *parent_args)
self.error = rpc_error
self.code = rpc_error['code'] if 'code' in rpc_error else None
self.message = rpc_error['message'] if 'message' in rpc_error else None
def __str__(self):
return '%d: %s' % (self.code, self.message)
def __repr__(self):
return '<%s \'%s\'>' % (self.__class__.__name__, self)
def EncodeDecimal(o):
if isinstance(o, decimal.Decimal):
return float(round(o, 8))
raise TypeError(repr(o) + " is not JSON serializable")
class AsyncAuthServiceProxy:
_semaphore = asyncio.BoundedSemaphore(MAXIMUM_NUMBER_OF_CONCURRENT_RPC_REQUESTS)
def __init__(self, service_url, service_name=None, reconnect_timeout=2, reconnect_amount=3, request_timeout=20):
self.service_url = service_url
self.service_name = service_name
self.url = urlparse(service_url)
self.id_count = 0
user = self.url.username
password = self.url.password
authpair = f"{user}:{password}".encode('utf-8')
self.auth_header = b'Basic ' + base64.b64encode(authpair)
self.reconnect_timeout = reconnect_timeout
self.reconnect_amount = reconnect_amount
self.request_timeout = request_timeout
self.client = httpx.AsyncClient(timeout=request_timeout, http2=True)
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
raise AttributeError
if self.service_name is not None:
name = f"{self.service_name}.{name}"
return AsyncAuthServiceProxy(self.service_url, name)
async def __call__(self, *args):
async with self._semaphore:
self.id_count += 1
postdata = json.dumps({
'version': '2.0',
'method': self.service_name,
'params': args,
'id': self.id_count
})
headers = {
'Host': self.url.hostname,
'User-Agent': "AuthServiceProxy/0.1",
'Authorization': self.auth_header.decode(),
'Content-type': 'application/json',
'Connection': 'keep-alive'
}
for i in range(self.reconnect_amount):
try:
if i > 0:
logger.warning(f"Reconnect try #{i+1}")
sleep_time = self.reconnect_timeout * (2 ** i)
logger.info(f"Waiting for {sleep_time} seconds before retrying.")
await asyncio.sleep(sleep_time)
response = await self.client.post(
self.service_url,
headers=headers,
content=postdata
)
response.raise_for_status()
response_json = response.json()
break
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred in __call__: {e}")
except httpx.RequestError as e:
logger.error(f"Request error occurred in __call__: {e}")
except Exception as e:
logger.error(f"Unexpected error occurred in __call__: {e}")
else:
logger.error("Reconnect tries exceeded.")
return
if 'error' in response_json and response_json['error'] is not None:
raise JSONRPCException(response_json['error'])
elif 'result' not in response_json:
raise JSONRPCException({
'code': -343, 'message': 'missing JSON-RPC result'})
else:
return response_json['result']
async def close(self):
await self.client.aclose()
async def save_stats_to_json():
global rpc_call_stats, tracking_period_start # Move both global declarations here
while True:
await asyncio.sleep(3600) # Adjust this value for how often you want to save stats (e.g., every hour)
tracking_period_end = datetime.utcnow()
stats_snapshot = {
"tracking_period_start": tracking_period_start.isoformat() + 'Z',
"tracking_period_end": tracking_period_end.isoformat() + 'Z',
"rpc_call_stats": dict(rpc_call_stats)
}
# Append the stats to the JSON file
try:
with open('rpc_call_stats.json', 'a') as f:
f.write(json.dumps(stats_snapshot) + '\n')
except Exception as e:
print(f"Failed to save stats to JSON: {e}")
# Reset tracking for the next period
rpc_call_stats = defaultdict(lambda: {
"count": 0,
"cumulative_time": 0.0,
"average_time": 0.0,
"success_count": 0,
"total_response_size": 0,
"average_response_size": 0.0,
"timeout_errors": 0,
"connection_errors": 0,
"other_errors": 0
})
tracking_period_start = tracking_period_end
def track_rpc_call(func):
@wraps(func)
async def wrapper(*args, **kwargs):
method_name = kwargs.get('method_name', func.__name__)
def make_hashable(obj):
if isinstance(obj, (list, tuple)):
return tuple(make_hashable(e) for e in obj)
elif isinstance(obj, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in obj.items()))
elif isinstance(obj, set):
return frozenset(make_hashable(e) for e in obj)
return obj
hashable_args = make_hashable(args)
hashable_kwargs = make_hashable(kwargs)
rpc_key = (method_name, hashable_args, hashable_kwargs)
start_time = time.time()
try:
result = await func(*args, **kwargs)
elapsed_time = time.time() - start_time
response_size = len(str(result).encode('utf-8'))
if rpc_key not in rpc_call_stats:
rpc_call_stats[rpc_key] = {
"count": 0,
"cumulative_time": 0.0,
"average_time": 0.0,
"success_count": 0,
"total_response_size": 0,
"average_response_size": 0.0,
"timeout_errors": 0,
"connection_errors": 0,
"other_errors": 0
}
rpc_call_stats[rpc_key]["count"] += 1
rpc_call_stats[rpc_key]["cumulative_time"] += elapsed_time
rpc_call_stats[rpc_key]["average_time"] = (
rpc_call_stats[rpc_key]["cumulative_time"] / rpc_call_stats[rpc_key]["count"]
)
rpc_call_stats[rpc_key]["total_response_size"] += response_size
rpc_call_stats[rpc_key]["average_response_size"] = (
rpc_call_stats[rpc_key]["total_response_size"] / rpc_call_stats[rpc_key]["count"]
)
rpc_call_stats[rpc_key]["success_count"] += 1
return result
except httpx.TimeoutException:
if rpc_key not in rpc_call_stats:
rpc_call_stats[rpc_key] = {"timeout_errors": 0}
rpc_call_stats[rpc_key]["timeout_errors"] = rpc_call_stats[rpc_key].get("timeout_errors", 0) + 1
raise
except httpx.ConnectError:
if rpc_key not in rpc_call_stats:
rpc_call_stats[rpc_key] = {"connection_errors": 0}
rpc_call_stats[rpc_key]["connection_errors"] = rpc_call_stats[rpc_key].get("connection_errors", 0) + 1
raise
except Exception as e:
if rpc_key not in rpc_call_stats:
rpc_call_stats[rpc_key] = {"other_errors": 0}
rpc_call_stats[rpc_key]["other_errors"] = rpc_call_stats[rpc_key].get("other_errors", 0) + 1
raise e
return wrapper
#Wrapped RPC calls so we can track them and log their performance:
@track_rpc_call
async def getinfo(rpc_connection):
return await rpc_connection.getinfo()
@track_rpc_call
async def getblockcount(rpc_connection):
return await rpc_connection.getblockcount()
@track_rpc_call
async def getblockhash(rpc_connection, block_height):
return await rpc_connection.getblockhash(block_height)
@track_rpc_call
async def getblock(rpc_connection, block_hash):
return await rpc_connection.getblock(block_hash)
@track_rpc_call
async def listaddressamounts(rpc_connection):
return await rpc_connection.listaddressamounts()
@track_rpc_call
async def z_getbalance(rpc_connection, address_to_check):
return await rpc_connection.z_getbalance(address_to_check)
@track_rpc_call
async def getrawtransaction(rpc_connection, txid, verbose=1):
return await rpc_connection.getrawtransaction(txid, verbose)
@track_rpc_call
async def masternode_top(rpc_connection):
return await rpc_connection.masternode('top')
@track_rpc_call
async def masternodelist_full(rpc_connection):
return await rpc_connection.masternodelist('full')
@track_rpc_call
async def masternodelist_rank(rpc_connection):