diff --git a/.gitignore b/.gitignore index 55ab6ab..792dc88 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,8 @@ src/bin examples/* # Ignore the example folder (personal to each user) general-results/* +# Ignore the website summary folder +website-summary/* # Ignore System Files (from MAC) .DS_Store diff --git a/armiarma.sh b/armiarma.sh index 16e3e31..09287ee 100755 --- a/armiarma.sh +++ b/armiarma.sh @@ -173,7 +173,7 @@ LaunchCrawler(){ echo "" # Finaly launch Rumor form the Network File (showing the logs on terminal mode) - ../../src/bin/armiarma file launcher.rumor --formatter="terminal" --level="error" + sh -c 'echo "PID of the crawler: $$"; echo ""; exec ../../src/bin/armiarma file launcher.rumor --formatter="terminal" --level="error"' # Check if the compilation has been successful exec_error="$?" if [[ "$exec_error" -ne "0" ]] @@ -240,9 +240,8 @@ LaunchAnalyzer(){ echo "" # Set the Paths for the gossip-metrics.json peerstore.json and output - csv="${folderPath}/examples/${aux}/metrics/metrics.csv" - peerstore="${folderPath}/examples/${aux}/metrics/peerstore.json" - extrametrics="${folderPath}/examples/${aux}/metrics/extra-metrics.csv" + csv="${folderPath}/examples/${aux}/metrics.csv" + peerstore="${folderPath}/examples/${aux}/peerstore.json" plots="${folderPath}/examples/${aux}/plots" @@ -255,7 +254,7 @@ LaunchAnalyzer(){ # Run the Analyzer echo " Launching analyzer" echo "" - python3 ./src/analyzer/armiarma-analyzer.py "$csv" "$peerstore" "$extrametrics" "$plots" + python3 ./src/analyzer/armiarma-analyzer.py "$csv" "$peerstore" "$plots" # Deactivate the VENV deactivate @@ -286,7 +285,7 @@ LaunchGeneralResults(){ # Run the Analyzer echo " Launching General Overview Analyzer" echo "" - python3 ./src/analyzer/total-overview-analysis.py ./examples ./general-results + python3 ./src/analyzer/total-overview-analysis.py "$1" ./general-results echo "results available in \$ARMIARMA/results" echo "" # Deactivate the VENV @@ -300,6 +299,8 @@ LaunchGeneralResults(){ # 0. Get the options go version +pid="$!" +echo "PID: $pid" # Generate the examples folder if [[ -d ./examples ]]; then @@ -383,7 +384,7 @@ while getopts ":hcpfdo" option; do o) # Generate the general overview of the previously generated projects - LaunchGeneralResults + LaunchGeneralResults "$2" echo "Overview Analyzer Finished!" echo "" diff --git a/src/analyzer/armiarma-analyzer.py b/src/analyzer/armiarma-analyzer.py index 538a87f..b9776ac 100644 --- a/src/analyzer/armiarma-analyzer.py +++ b/src/analyzer/armiarma-analyzer.py @@ -57,7 +57,7 @@ def plotFromPandas(panda, pdf, opts): # Check is there is Value on top of the charts if opts['barValues'] is not None: - for ind, value in enumerate(yarray): + for ind, value in enumerate(opts['barValues']): plt.text(ind, value, str(value), fontsize=opts['textSize'], horizontalalignment='center') # Check id the grid has been set @@ -81,7 +81,7 @@ def plotStackBarsFromArrays(xarray, yarray, pdf, opts): fig, ax = plt.subplots(figsize = opts['figSize']) - colors = ['mediumseagreen', 'indianred', 'goldenrod','cornflowerblue'] + colors = ['mediumseagreen', 'springgreen', 'indianred', 'goldenrod', 'cornflowerblue'] legends=[] auxI = 0 bottom = [0,0] # done by hand @@ -526,14 +526,12 @@ def main(): # End of plotting variables csvFile = sys.argv[1] peerstoreFile = sys.argv[2] - extraMetrics = sys.argv[3] - outputFigsFolder = sys.argv[4] + outputFigsFolder = sys.argv[3] pdfFile = outputFigsFolder + "/MetricsSummary.pdf" peerstorePanda = getPandaFromPeerstoreJson(peerstoreFile) rumorMetricsPanda = pd.read_csv(csvFile) - extraPeerData = pd.read_csv(extraMetrics) # ---------- PLOT SECUENCE ----------- @@ -553,7 +551,7 @@ def main(): cnt9000 = 0 cntOthers = 0 noAddrs = 0 - + peerstoreLen = len(peerstore) metricsLen = len(rumorMetricsPanda) print() @@ -569,10 +567,25 @@ def main(): except: noAddrs = noAddrs + 1 + # Unknown Peers with 13000 port + unk13000 = 0 + tcp13000 = 0 + cunk1300 = 0 + for index, row in rumorMetricsPanda.iterrows(): + + if '/13000' in str(row['Address']): + tcp13000 = tcp13000 +1 + if row['Client'] == 'Unknown': + unk13000 = unk13000 +1 + if row['Connected'] == True: + cunk1300 = cunk1300 +1 + print(tcp13000, unk13000, cunk1300) + print('Number of clients with the TPC port at 13000:', cnt13000) print('Number of clients with the TPC port at 9000: ', cnt9000) print('Number of clients with other TCP ports: ', cntOthers) print('Number of clients without address: ', noAddrs) + print('Number of Unknown clients with 13000 TCP port:', unk13000) summ = cnt13000 + cnt9000 + cntOthers + noAddrs noPrysmPort = cnt9000 + cntOthers + noAddrs if summ != peerstoreLen: @@ -694,11 +707,15 @@ def main(): nonAttempted = 0 succeed = 0 failed = 0 + connected = 0 - print("extra metrics len:", len(extraPeerData)) - for index, row in extraPeerData.iterrows(): + print("extra metrics len:", len(rumorMetricsPanda)) + for index, row in rumorMetricsPanda.iterrows(): if row['Attempted'] == False: - nonAttempted = nonAttempted + 1 + if row['Connected'] == True: + connected = connected + 1 + else: + nonAttempted = nonAttempted + 1 else: if row['Succeed'] == False: failed = failed + 1 @@ -708,59 +725,26 @@ def main(): print("Not tried from the last peerstore copy:",nonAttempted) print("Tried and succeed:", succeed) print("Tried and failed", failed) - nonAttempted = nonAttempted + (peerstoreLen - (len(extraPeerData))) + print("Incoming connections:", connected) + nonAttempted = nonAttempted + (peerstoreLen - (len(rumorMetricsPanda))) print("Total Not tried from the entrire peerstore", nonAttempted) # get length of the peerstore peerstoreSize = getLengthOfPanda(peerstorePanda) peerMetricsSize = getLengthOfPanda(rumorMetricsPanda) - print("Attempted and succeed", succeed, "| On the metrics", peerMetricsSize) - - if succeed < peerMetricsSize: - print("- Dismatch on the extra metrics and metrics -") - for idx, row in rumorMetricsPanda.iterrows(): - index = extraPeerData.index[extraPeerData['Peer Id'] == row['Peer Id']] - extraPeerData.loc[index, 'Attempted'] = True - extraPeerData.loc[index, 'Succeed'] = True - extraPeerData.loc[index, 'Error'] = "None" - # plot the metrics gathered on the extra-metrics - nonAttempted = 0 - succeed = 0 - failed = 0 - print("\n -- Updating extra-results -- \n") - print("extra metrics len:", len(extraPeerData)) - for index, row in extraPeerData.iterrows(): - if row['Attempted'] == False: - nonAttempted = nonAttempted + 1 - else: - if row['Succeed'] == False: - failed = failed + 1 - else: - succeed = succeed + 1 - - print("Not tried from the last peerstore copy:",nonAttempted) - print("Tried and succeed:", succeed) - print("Tried and failed", failed) - nonAttempted = nonAttempted + (peerstoreLen - (len(extraPeerData))) - print("Total Not tried from the entrire peerstore", nonAttempted) - - - print("Attempted and succeed", succeed, "| On the metrics", peerMetricsSize) - if succeed != peerMetricsSize: - print("----> WARN: Random connected peers and peers on the metrics don't match") - + print("Peers in metrics", peerMetricsSize, "| On the peerstore", peerstoreSize) + if peerMetricsSize != peerstoreSize: + print("----> WARN: Peers in Peerstore and peers on the metrics don't match") + ## -- website code -- print("\n") print("Results from crawler run on [month] running for [crawling time].\n
Total amount of peers on the peerstore:", peerstoreLen,".\n
Number of clients with the TPC port at 13000 (Prysm?):", cnt13000,".\n
Percentage of 'Prysm' peers from the peerstore (based on the TCP port):", round((cnt13000*100)/peerstoreLen,2),"%.\n
We manage to connect with", succeed,"peers from the peerstore.\n
This would be the distribution.") print("\n") - - - - - xarray = [[0, succeed], [0, failed], [0, nonAttempted], [peerstoreLen, 0]] + + xarray = [[0, succeed], [0, connected], [0, failed], [0, nonAttempted], [peerstoreLen, 0]] yarray = ['Peerstore', 'Peering Results'] - labels = ['connected', 'failed', 'not attempted', 'peerstore'] + labels = ['connected', 'incoming', 'failed', 'not tried', 'peerstore'] barColor = ['tab:blue', 'tab:green'] @@ -793,18 +777,24 @@ def main(): ## Classify the non connected peers by error - errorList = getItemsFromColumn(extraPeerData, 'Error') - errorList.remove('None') - auxxarray, auxyarray = getDataFromPanda(extraPeerData, None, "Error", errorList, 'counter') + errorList = getItemsFromColumn(rumorMetricsPanda, 'Error') + try: + errorList.remove('None') + except: + pass + auxxarray, auxyarray = getDataFromPanda(rumorMetricsPanda, None, "Error", errorList, 'counter') xarray, yarray = sortArrayMaxtoMin(auxxarray, auxyarray) # Get Color Grid barColor = GetColorGridFromArray(yarray) print() + """ for idx,item in enumerate(xarray): print(item, ',', yarray[idx]) print() + """ + plotHorizontalBarsFromArrays(xarray, yarray, pdf, opts={ 'figSize': (12,7), @@ -833,12 +823,27 @@ def main(): 'tickRotation': 0, 'show': False}) + # Subgenerate a panda only with the peers that we did exchange metadata before + indexToDrop = [] + counter = 0 + auxMetricsPanda = rumorMetricsPanda.copy() + for index, row in auxMetricsPanda.iterrows(): + if row['Request Metadata'] == False: # If we didn't exchange metadata with them, remove them from the panda copy + indexToDrop.append(index) + else: + counter += 1 + #print(index, row['Client']) + + auxMetricsPanda.drop(indexToDrop, axis=0, inplace=True) + #print(auxMetricsPanda) + print("\nOrg. Metrics:", len(rumorMetricsPanda), "Should have:", counter, "Filtered:", len(auxMetricsPanda), "\n") + clientCounter = [] types = [] typesCounter = [] for idx, item in enumerate(clientList): - tcnt, tp, tpc = getTypesPerName(rumorMetricsPanda, item, 'Client', 'Version') + tcnt, tp, tpc = getTypesPerName(auxMetricsPanda, item, 'Client', 'Version') clientCounter.append(tcnt) types.append(tp) typesCounter.append(tpc) @@ -847,6 +852,10 @@ def main(): yarray = typesCounter namesarray = clientList + print(clientCounter) + print(types) + print(typesCounter) + plotDoublePieFromArray(yarray, pdf, opts={ 'figsize': figSize, 'figtitle': 'PeersPerClient.png', @@ -895,10 +904,14 @@ def main(): # Delete Prysm from the extrapoling del clientsCnt['Prysm'] print(clientsCnt) + # Get number of unknown peers that has the 13000 port + print('Number of unknown with 13000 port', unk13000) # Get total of non prysm peers nonPrysmObserved = 0 for k in clientsCnt: nonPrysmObserved = nonPrysmObserved + clientsCnt[k] + # Remove Unknown with 13000 port from nonPrysmObserved + nonPrysmObserved = nonPrysmObserved # Get for each of the clients the extrapolation to the peerstore extrapolatedClientList = {} for k in clientsCnt: @@ -950,7 +963,7 @@ def main(): auxxarray, auxyarray = getDataFromPanda(rumorMetricsPanda, None, "Country", countriesList, 'counter') print("Number of different Countries hosting Eth2 clients:", len(auxxarray)) # Remove the Countries with less than X peers - countryLimit = 10 + countryLimit = 60 xarray = [] yarray = [] for idx, item in enumerate(auxyarray): diff --git a/src/analyzer/crawler-progresion.py b/src/analyzer/crawler-progresion.py new file mode 100644 index 0000000..d0031df --- /dev/null +++ b/src/analyzer/crawler-progresion.py @@ -0,0 +1,316 @@ +# This file compiles the code to analyze the gathered metrics from each of the +# folders in the $ARMIARMAPATH/examples folder, providing a global overview of the +# Peer distributions and client types over the dayly analysis + +import os, sys +import json +import time +import pandas as pd +import matplotlib.pyplot as plt +import numpy as np +from datetime import datetime + +inittime = 0 + +def mainexecution(): + projectsFolder = sys.argv[1] + outFolder = sys.argv[2] + print("reading folder:", projectsFolder) + print("output folder:", outFolder) + outJson = outFolder + '/' + 'client-distribution.json' + + # So far, the generated panda will just contain the client count + clientDist = {'date': [], 'Lighthouse': [], 'Teku': [], 'Nimbus': [], 'Prysm': [], 'Lodestar': [], 'Unknown': []} + stimationDist = {'date': [], 'Lighthouse': [], 'Teku': [], 'Nimbus': [], 'Prysm': [], 'Lodestar': [], 'Unknown': []} + # Concatenation of the Json values + + print(projectsFolder) + for root, dirs, _ in os.walk(projectsFolder): + print(dirs) + dirs.sort() + # We just need to access the folders inside examples + for d in dirs: + fold = os.path.join(root, d) + f = fold + '/gossip-metrics.json' + cf = fold + '/custom-metrics.json' + if os.path.exists(f): + # Load the readed values from the json into the panda + poblatePandaObservedClients(clientDist, f, cf) + poblatePandaStimatedClients(stimationDist, f, cf) + print(f) + break + + # After filling the dict with all the info from the JSONs, generate the panda + df = pd.DataFrame (clientDist, columns = ['date', 'Lighthouse', 'Teku', 'Nimbus', 'Prysm', 'Lodestar', 'Unknown']) + df = df.sort_values(by="date") + print(df) + + # After filling the dict with all the info from the JSONs, generate the panda + sf = pd.DataFrame (stimationDist, columns = ['date', 'Lighthouse', 'Teku', 'Nimbus', 'Prysm', 'Lodestar', 'Unknown']) + sf = sf.sort_values(by="date") + print(sf) + + outputFigsFolder = outFolder + '/' + 'plots' + clientList = ['Lighthouse', 'Teku', 'Nimbus', 'Prysm', 'Lodestar', 'Unknown'] + figSize = (10,6) + titleSize = 24 + textSize = 20 + labelSize = 20 + # Plot the images + plotStackedChart(df, opts={ + 'figSize': figSize, + 'figTitle': 'Client-Distribution.png', + 'figtitle': 'client-distribution.png', + 'outputPath': outputFigsFolder, + 'align': 'center', + 'grid': 'y', + 'textSize': textSize, + 'title': "Evolution of the experienced client distribution", + 'ylabel': 'Client concentration (%)', + 'xlabel': None, + 'yticks': np.arange(0, 110, 10), + 'legendLabels': clientList, + 'titleSize': titleSize, + 'labelSize': labelSize, + 'lengendPosition': 1, + 'legendSize': labelSize, + 'tickRotation': 90, + 'show': True}) + + # Plot the images + plotStackedChart(sf, opts={ + 'figSize': figSize, + 'figTitle': 'Client estimation distribution', + 'figtitle': 'client-estimation-distribution.png', + 'outputPath': outputFigsFolder, + 'align': 'center', + 'grid': 'y', + 'textSize': textSize, + 'title': "Evolution of the estimated client distribution", + 'ylabel': 'Client concentration (%)', + 'xlabel': None, + 'yticks': np.arange(0, 110, 10), + 'legendLabels': clientList, + 'titleSize': titleSize, + 'labelSize': labelSize, + 'lengendPosition': 1, + 'legendSize': labelSize, + 'tickRotation': 90, + 'show': True}) + +def poblatePandaObservedClients(clientDist, jsonFile, customFile): + # Read the Json + jsf = open(jsonFile) + jsonValues = json.load(jsf) + jsf.close() + + cf = open(customFile) + cfValues = json.load(cf) + cf.close() + + global inittime + + crwlig = 0 + crwtek = 0 + crwnim = 0 + crwpry = 0 + crwlod = 0 + crwunk = 0 + cnt = 0 + for k in jsonValues: + peer = jsonValues[k] + if 'MetadataRequest' in peer: + if peer['MetadataRequest'] == True: + cnt = cnt + 1 + if 'lig' in peer['ClientType'].lower(): + crwlig = crwlig + 1 + elif 'teku' in peer['ClientType'].lower(): + crwtek = crwtek + 1 + elif 'nimbus' in peer['ClientType'].lower(): + crwnim = crwnim + 1 + elif 'prysm' in peer['ClientType'].lower(): + crwpry = crwpry + 1 + elif 'js-libp2p' in peer['ClientType'].lower(): + crwlod = crwlod + 1 + elif 'unk' in peer['ClientType'].lower(): + crwunk = crwunk + 1 + else: + crwunk = crwunk + 1 + else: + return + + print("total in metrics:", len(jsonValues)) + print("total requested:", cnt) + + if cnt == 0: + print(jsonValues) + return + + lig = round((crwlig*100)/cnt, 3) + tek = round((crwtek*100)/cnt, 3) + nim = round((crwnim*100)/cnt, 3) + pry = round((crwpry*100)/cnt, 3) + lod = round((crwlod*100)/cnt, 3) + unk = round((crwunk*100)/cnt, 3) + + if cfValues['StopTime']['Month'] < 10: + month = "0" + str(cfValues['StopTime']['Month']) + else: + month = str(cfValues['StopTime']['Month']) + + if cfValues['StopTime']['Day'] < 10: + day = "0" + str(cfValues['StopTime']['Day']) + else: + day = str(cfValues['StopTime']['Day']) + + if cfValues['StopTime']['Hour'] < 10: + hour = "0" + str(cfValues['StopTime']['Hour']) + else: + hour = str(cfValues['StopTime']['Hour']) + + if cfValues['StopTime']['Minute'] < 10: + minutes = "0" + str(cfValues['StopTime']['Minute']) + else: + minutes = str(cfValues['StopTime']['Minute']) + + + cday = str(cfValues['StopTime']['Year']) + '/' + month + '/' + day + '-' + hour + '-' + minutes + s = time.mktime(datetime.strptime(cday, "%Y/%m/%d-%H-%M").timetuple()) + if inittime == 0: + inittime = s + h = (s -inittime)/(60*60) # to get it in Hours + clientDist['date'].append(h) + clientDist['Lighthouse'].append(lig) + clientDist['Teku'].append(tek) + clientDist['Nimbus'].append(nim) + clientDist['Prysm'].append(pry) + clientDist['Lodestar'].append(lod) + clientDist['Unknown'].append(unk) + +def poblatePandaStimatedClients(stimatedDist, jsonFile, customFile): + # Read the Json + jsf = open(jsonFile) + jsonValues = json.load(jsf) + jsf.close() + + cf = open(customFile) + cfValues = json.load(cf) + cf.close() + + global inittime + + # Aux Variables + tcp13000 = 0 + + crwlig = 0 + crwtek = 0 + crwnim = 0 + crwlod = 0 + crwunk = 0 + cnt = 0 + for k in jsonValues: + peer = jsonValues[k] + if 'MetadataRequest' in peer: + if peer['MetadataRequest'] == True: + cnt = cnt + 1 + if 'lig' in peer['ClientType'].lower(): + crwlig = crwlig + 1 + elif 'teku' in peer['ClientType'].lower(): + crwtek = crwtek + 1 + elif 'nimbus' in peer['ClientType'].lower(): + crwnim = crwnim + 1 + elif 'js-libp2p' in peer['ClientType'].lower(): + crwlod = crwlod + 1 + elif 'unk' in peer['ClientType'].lower(): + crwunk = crwunk + 1 + if '/13000' in peer['Addrs']: + tcp13000 = tcp13000 + 1 + else: + return + + total = len(jsonValues) + print("total in metrics:", len(jsonValues)) + print("total tcp 13000:", tcp13000) + + res = total - tcp13000 + + noPrysm = crwlig + crwtek + crwnim + crwlod + crwunk + + if total == 0 or noPrysm == 0: + print(jsonValues) + return + + estimlig = (res*crwlig)/noPrysm + estimtek = (res*crwtek)/noPrysm + estimnim = (res*crwnim)/noPrysm + estimlod = (res*crwlod)/noPrysm + estimunk = (res*crwunk)/noPrysm + + lig = round((estimlig*100)/total, 2) + tek = round((estimtek*100)/total, 2) + nim = round((estimnim*100)/total, 2) + pry = round((tcp13000*100)/total, 2) + lod = round((estimlod*100)/total, 2) + unk = round((estimunk*100)/total, 2) + + if cfValues['StopTime']['Month'] < 10: + month = "0" + str(cfValues['StopTime']['Month']) + else: + month = str(cfValues['StopTime']['Month']) + + if cfValues['StopTime']['Day'] < 10: + day = "0" + str(cfValues['StopTime']['Day']) + else: + day = str(cfValues['StopTime']['Day']) + + if cfValues['StopTime']['Hour'] < 10: + hour = "0" + str(cfValues['StopTime']['Hour']) + else: + hour = str(cfValues['StopTime']['Hour']) + + if cfValues['StopTime']['Minute'] < 10: + minutes = "0" + str(cfValues['StopTime']['Minute']) + else: + minutes = str(cfValues['StopTime']['Minute']) + + cday = str(cfValues['StopTime']['Year']) + '/' + month + '/' + day + '-' + hour + '-' + minutes + s = time.mktime(datetime.strptime(cday, "%Y/%m/%d-%H-%M").timetuple()) + if inittime == 0: + inittime = s + h = (s -inittime)/(60*60) # to get it in Hours + stimatedDist['date'].append(h) + stimatedDist['Lighthouse'].append(lig) + stimatedDist['Teku'].append(tek) + stimatedDist['Nimbus'].append(nim) + stimatedDist['Prysm'].append(pry) + stimatedDist['Lodestar'].append(lod) + stimatedDist['Unknown'].append(unk) + + +def plotStackedChart(p, opts): + outputFile = str(opts['outputPath']) + '/' + opts['figTitle'] + + ax = p.plot.area(figsize = opts['figSize'], x='date', stacked=True) + + # labels + if opts['ylabel'] is not None: + plt.ylabel(opts['ylabel'], fontsize=opts['labelSize']) + if opts['xlabel'] is not None: + plt.xlabel(opts['xlabel'], fontsize=opts['labelSize']) + + handles,legends = ax.get_legend_handles_labels() + ax.legend(handles=handles, labels=opts['legendLabels'], loc='upper center', bbox_to_anchor=(0.5, -0.05), fancybox=True, shadow=True, ncol=6) + + if opts['grid'] != None: + ax.grid(which='major', axis=opts['grid'], linestyle='--') + + plt.yticks(opts['yticks']) + + plt.title(opts['title'], fontsize = opts['titleSize']) + plt.tight_layout() + plt.savefig(outputFile) + if opts['show'] is True: + plt.show() + +if __name__ == '__main__': + mainexecution() \ No newline at end of file diff --git a/src/analyzer/total-overview-analysis.py b/src/analyzer/total-overview-analysis.py index 35857ab..13a99d7 100644 --- a/src/analyzer/total-overview-analysis.py +++ b/src/analyzer/total-overview-analysis.py @@ -12,7 +12,8 @@ def mainexecution(): projectsFolder = sys.argv[1] outFolder = sys.argv[2] - + print("reading folder:", projectsFolder) + print("output folder:", outFolder) outJson = outFolder + '/' + 'client-distribution.json' # So far, the generated panda will just contain the client count p = {'date': [], 'Lighthouse': [], 'Teku': [], 'Nimbus': [], 'Prysm': [], 'Lodestar': [], 'Unknown': []} @@ -29,6 +30,7 @@ def mainexecution(): if os.path.exists(f): # Load the readed values from the json into the panda poblatePandaCustomMetrics(p, j, f) + print(f) break # Export the Concatenated json to the given folder diff --git a/src/control/actor/actor.go b/src/control/actor/actor.go index f8fdfe2..0ddfef9 100644 --- a/src/control/actor/actor.go +++ b/src/control/actor/actor.go @@ -180,14 +180,15 @@ func (c *ActorCmd) Cmd(route string) (cmd interface{}, err error) { switch route { case "host": cmd = &host.HostCmd{ - Base: b, - WithSetHost: &c.HostState, - PrivSettings: c, - WithEnrNode: c, - WithCloseHost: &c.HostState, - GlobalPeerstores: c.GlobalPeerstores, - CurrentPeerstore: c.CurrentPeerstore, - GossipMetrics: &c.GossipMetrics, + Base: b, + WithSetHost: &c.HostState, + PrivSettings: c, + WithEnrNode: c, + WithCloseHost: &c.HostState, + GlobalPeerstores: c.GlobalPeerstores, + CurrentPeerstore: c.CurrentPeerstore, + PeerMetadataState: &c.PeerMetadataState, + GossipMetrics: &c.GossipMetrics, } case "enr": cmd = &enr.EnrCmd{Base: b, Lazy: &c.LazyEnrState, PrivSettings: c, WithHostPriv: &c.HostState} @@ -197,11 +198,11 @@ func (c *ActorCmd) Cmd(route string) (cmd interface{}, err error) { store = nil } cmd = &peer.PeerCmd{ - Base: b, - PeerStatusState: &c.PeerStatusState, - PeerMetadataState: &c.PeerMetadataState, - Store: store, - GossipMetrics: &c.GossipMetrics, + Base: b, + PeerStatusState: &c.PeerStatusState, + PeerMetadataState: &c.PeerMetadataState, + Store: store, + GossipMetrics: &c.GossipMetrics, } case "peerstore": cmd = &peerstore.PeerstoreCmd{ diff --git a/src/control/actor/gossip/topic/export_metris.go b/src/control/actor/gossip/topic/export_metris.go index 534ed44..14bce60 100644 --- a/src/control/actor/gossip/topic/export_metris.go +++ b/src/control/actor/gossip/topic/export_metris.go @@ -2,30 +2,43 @@ package topic import ( "context" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "runtime/debug" + "strconv" "time" "github.com/libp2p/go-libp2p-core/host" "github.com/protolambda/rumor/control/actor/base" "github.com/protolambda/rumor/metrics" - "github.com/protolambda/rumor/metrics/custom" + "github.com/protolambda/rumor/metrics/custom" + "github.com/protolambda/rumor/metrics/export" "github.com/protolambda/rumor/p2p/track" ) type TopicExportMetricsCmd struct { *base.Base - GossipMetrics *metrics.GossipMetrics - GossipState *metrics.GossipState - Store track.ExtendedPeerstore - ExportPeriod time.Duration `ask:"--export-period" help:"Requets the frecuency in witch the Metrics will be exported to the files"` - FilePath string `ask:"--file-path" help:"The path of the file where to export the metrics."` - PeerstorePath string `ask:"--peerstore-path" help:"The path of the file where to export the peerstore."` - CsvPath string `ask:"--csv-file" help:"The path where the csv file will be exported"` - ExtraMetricsPath string `ask:"--extra-metrics-path" help:"The path to the csv file where the extra metrics will be exported"` - CustomMetricsPath string `ask:"--custom-metrics-path" help:"The path to the json file where the custom metrics will be exported"` + GossipMetrics *metrics.GossipMetrics + GossipState *metrics.GossipState + Store track.ExtendedPeerstore + ExportPeriod time.Duration `ask:"--export-period" help:"Requets the frecuency in witch the Metrics will be exported to the files"` + BackupPeriod time.Duration `ask:"--backup-period" help:"Requets the frecuency in witch the Backup of the Metrics will be exported"` + MetricsFolderPath string `ask:"--metrics-folder-path" help:"The path of the folder where to export the metrics."` + RawFilePath string + PeerstorePath string + CsvPath string + CustomMetricsPath string } func (c *TopicExportMetricsCmd) Defaul() { - c.ExportPeriod = 60 * time.Second + c.ExportPeriod = 24 * time.Hour + c.BackupPeriod = 30 * time.Minute + c.RawFilePath = "gossip-metrics.json" + c.CustomMetricsPath = "custom-metrics.json" + c.PeerstorePath = "peerstore.json" + c.CsvPath = "metrics.csv" } func (c *TopicExportMetricsCmd) Help() string { @@ -33,116 +46,229 @@ func (c *TopicExportMetricsCmd) Help() string { } func (c *TopicExportMetricsCmd) Run(ctx context.Context, args ...string) error { - if c.GossipState.GsNode == nil { - return NoGossipErr - } - // Generate the custom Metrics to export in Json at end of execution - customMetrics := custom.NewCustomMetrics() - c.Log.Info("Checking for existing Metrics on Project ...") - // Check if Previous GossipMetrics were generated - err, fileExists := c.GossipMetrics.ImportMetrics(c.FilePath) - if fileExists && err != nil { - c.Log.Error("Error Importing the metrics from the previous file", err) - } - if !fileExists { - c.Log.Info("Not previous metrics found, generating new ones") - } - // Check if Previous ExtraMetrics were generated - err, fileExists = c.GossipMetrics.ExtraMetrics.ImportMetrics(c.ExtraMetricsPath) - if fileExists && err != nil { - c.Log.Error("Error Importing the extra-metrics from the previous file", err) - } - if !fileExists { - c.Log.Info("Not previous extra-metrics found, generating new ones") - } - stopping := false + if c.GossipState.GsNode == nil { + return NoGossipErr + } + // Generate the custom Metrics to export in Json at end of execution + customMetrics := custom.NewCustomMetrics() + c.Log.Info("Checking for existing Metrics on Project ...") + // Check if Previous GossipMetrics were generated + err, fileExists := c.GossipMetrics.ImportMetrics(c.MetricsFolderPath) + if fileExists && err != nil { + c.Log.Error("Error Importing the metrics from the previous file", err) + } + if !fileExists { + c.Log.Info("Not previous metrics found, generating new ones") + } + c.Log.Infof("Exporting Every %d , with a backup every %d", c.ExportPeriod, c.BackupPeriod) + fmt.Println("Exporting Every ", c.ExportPeriod, " with a backup every", c.BackupPeriod) + stopping := false go func() { + t := time.Now() + fmt.Println("Initial time:", t) + c.UpdateFilesAndFolders(t) + + // loop to export the metrics every Backup and Period time for { - if stopping { - _ = c.GossipMetrics.ExportMetrics(c.FilePath, c.PeerstorePath, c.CsvPath, c.ExtraMetricsPath, c.Store) - c.Log.Infof("Metrics Export Stopped") - return - } + if stopping { + _ = c.GossipMetrics.ExportMetrics(c.RawFilePath, c.PeerstorePath, c.CsvPath, c.Store) + c.Log.Infof("Metrics Export Stopped") + h, _ := c.Host() + // Exporting the CustomMetrics for last time (still don't know which is the best place where to put this call) + err := FilCustomMetrics(c.GossipMetrics, c.Store, &customMetrics, h) + if err != nil { + fmt.Println(err) + return + } + // export the CustomMetrics into a json + err = customMetrics.ExportJson(c.CustomMetricsPath) + if err != nil { + fmt.Println(err) + return + } + return + } + start := time.Now() - c.Log.Infof("Exporting Metrics") - c.GossipMetrics.FillMetrics(c.Store) - err := c.GossipMetrics.ExportMetrics(c.FilePath, c.PeerstorePath, c.CsvPath, c.ExtraMetricsPath, c.Store) - if err != nil { - c.Log.Infof("Problems exporting the Metrics to the given file path") - } else { - ed := time.Since(start) - log := "Metrics Exported, time to export:" + ed.String() - c.Log.Infof(log) - } - exportStepDuration := time.Since(start) - if exportStepDuration < c.ExportPeriod{ - time.Sleep(c.ExportPeriod - exportStepDuration) + c.Log.Infof("Backup Export of the Metrics") + c.ExportSecuence(t, &customMetrics) + + // Check Backup period to wait for next round + exportStepDuration := time.Since(start) + if exportStepDuration < c.BackupPeriod { + fmt.Println("Waiting to run new backup export") + wt := c.BackupPeriod - exportStepDuration + fmt.Println("Waiting time:", wt) + time.Sleep(wt) + } + // Check if the Export Period has been accomplished (generate new folder for the metrics) + tnow := time.Since(t) + if tnow >= c.ExportPeriod { + c.Log.Infof("Exporting Metrics changing to Folder") + t = time.Now() + c.UpdateFilesAndFolders(t) + c.ExportSecuence(t, &customMetrics) + c.GossipMetrics.ResetDynamicMetrics() + // Force Memmory Free from the Garbage Collector + debug.FreeOSMemory() } } }() c.Control.RegisterStop(func(ctx context.Context) error { stopping = true c.Log.Infof("Stoped Exporting") - h, _ := c.Host() - // Exporting the CustomMetrics for last time (still don't know which is the best place where to put this call) - err := FilCustomMetrics(c.GossipMetrics, c.Store, &customMetrics, h) - if err != nil { - return err - } - // export the CustomMetrics into a json - err = customMetrics.ExportJson(c.CustomMetricsPath) - if err != nil { - return err - } return nil }) return nil } - // fulfil the info from the Custom Metrics -func FilCustomMetrics(gm *metrics.GossipMetrics, ps track.ExtendedPeerstore, cm *custom.CustomMetrics, h host.Host) error{ - // TODO: - Generate and do the client version stuff - - // Get total peers in peerstore - peerstoreLen := custom.TotalPeers(h) - // get the connection status for each of the peers in the extra-metrics - succeed, failed, notattempted := gm.ExtraMetrics.GetConnectionMetrics(h) - // Analyze the reported error by the connection attempts - resetbypeer, timeout, dialtoself, dialbackoff, uncertain := gm.ExtraMetrics.GetErrorCounter(h) - // Filter peers on peerstore by port - x, y, z := custom.GetPeersWithPorts(h, ps) - // Generate the MetricsDataFrame of the Current Metrics - mdf := metrics.NewMetricsDataFrame(gm.GossipMetrics) - lig := mdf.AnalyzeClientType("Lighthouse") - tek := mdf.AnalyzeClientType("Teku") - nim := mdf.AnalyzeClientType("Nimbus") - pry := mdf.AnalyzeClientType("Prysm") - lod := mdf.AnalyzeClientType("Lodestar") - unk := mdf.AnalyzeClientType("Unknown") - - // read client versions from Metrics - cm.PeerStore.SetTotal(peerstoreLen) - cm.PeerStore.SetPort13000(x) - cm.PeerStore.SetPort9000(y) - cm.PeerStore.SetPortDiff(z) - cm.PeerStore.SetNotAttempted(notattempted) - cm.PeerStore.ConnectionFailed.SetTotal(failed) - cm.PeerStore.ConnectionFailed.SetResetByPeer(resetbypeer) - cm.PeerStore.ConnectionFailed.SetTimeOut(timeout) - cm.PeerStore.ConnectionFailed.SetDialToSelf(dialtoself) - cm.PeerStore.ConnectionFailed.SetDialBackOff(dialbackoff) - cm.PeerStore.ConnectionFailed.SetUncertain(uncertain) - - // fill the CustomMetrics with the readed information - cm.PeerStore.ConnectionSucceed.SetTotal(succeed) - cm.PeerStore.ConnectionSucceed.Lighthouse = lig - cm.PeerStore.ConnectionSucceed.Teku = tek - cm.PeerStore.ConnectionSucceed.Nimbus = nim - cm.PeerStore.ConnectionSucceed.Prysm = pry - cm.PeerStore.ConnectionSucceed.Lodestar = lod - cm.PeerStore.ConnectionSucceed.Unknown = unk - - return nil +func FilCustomMetrics(gm *metrics.GossipMetrics, ps track.ExtendedPeerstore, cm *custom.CustomMetrics, h host.Host) error { + // Get total peers in peerstore + peerstoreLen := custom.TotalPeers(h) + // get the connection status for each of the peers in the extra-metrics + succeed, failed, notattempted := gm.GetConnectionMetrics(h) + // Analyze the reported error by the connection attempts + resetbypeer, timeout, dialtoself, dialbackoff, uncertain := gm.GetErrorCounter(h) + // Filter peers on peerstore by port + x, y, z := custom.GetPeersWithPorts(h, ps) + // Generate the MetricsDataFrame of the Current Metrics + mdf := export.NewMetricsDataFrame(gm.GossipMetrics) + lig := mdf.AnalyzeClientType("Lighthouse") + tek := mdf.AnalyzeClientType("Teku") + nim := mdf.AnalyzeClientType("Nimbus") + pry := mdf.AnalyzeClientType("Prysm") + lod := mdf.AnalyzeClientType("Lodestar") + unk := mdf.AnalyzeClientType("Unknown") + + // read client versions from Metrics + cm.PeerStore.SetTotal(peerstoreLen) + cm.PeerStore.SetPort13000(x) + cm.PeerStore.SetPort9000(y) + cm.PeerStore.SetPortDiff(z) + cm.PeerStore.SetNotAttempted(notattempted) + cm.PeerStore.ConnectionFailed.SetTotal(failed) + cm.PeerStore.ConnectionFailed.SetResetByPeer(resetbypeer) + cm.PeerStore.ConnectionFailed.SetTimeOut(timeout) + cm.PeerStore.ConnectionFailed.SetDialToSelf(dialtoself) + cm.PeerStore.ConnectionFailed.SetDialBackOff(dialbackoff) + cm.PeerStore.ConnectionFailed.SetUncertain(uncertain) + + // fill the CustomMetrics with the readed information + cm.PeerStore.ConnectionSucceed.SetTotal(succeed) + cm.PeerStore.ConnectionSucceed.Lighthouse = lig + cm.PeerStore.ConnectionSucceed.Teku = tek + cm.PeerStore.ConnectionSucceed.Nimbus = nim + cm.PeerStore.ConnectionSucceed.Prysm = pry + cm.PeerStore.ConnectionSucceed.Lodestar = lod + cm.PeerStore.ConnectionSucceed.Unknown = unk + + // fill the json with client distribution from those peers we got the metadata request from + mtlig := mdf.AnalyzeClientTypeIfMetadataRequested("Lighthouse") + mttek := mdf.AnalyzeClientTypeIfMetadataRequested("Teku") + mtnim := mdf.AnalyzeClientTypeIfMetadataRequested("Nimbus") + mtpry := mdf.AnalyzeClientTypeIfMetadataRequested("Prysm") + mtlod := mdf.AnalyzeClientTypeIfMetadataRequested("Lodestar") + mtunk := mdf.AnalyzeClientTypeIfMetadataRequested("Unknown") + + tot := mtlig.Total + mttek.Total + mtnim.Total + mtpry.Total + mtlod.Total + mtunk.Total + + // fill the CustomMetrics with the readed information + cm.PeerStore.MetadataRequested.SetTotal(tot) + cm.PeerStore.MetadataRequested.Lighthouse = mtlig + cm.PeerStore.MetadataRequested.Teku = mttek + cm.PeerStore.MetadataRequested.Nimbus = mtnim + cm.PeerStore.MetadataRequested.Prysm = mtpry + cm.PeerStore.MetadataRequested.Lodestar = mtlod + cm.PeerStore.MetadataRequested.Unknown = mtunk + + return nil +} + +func (c *TopicExportMetricsCmd) UpdateFilesAndFolders(t time.Time) { + year := strconv.Itoa(t.Year()) + month := t.Month().String() + day := strconv.Itoa(t.Day()) + h := t.Hour() + var hour string + if h < 10 { + hour = "0" + strconv.Itoa(h) + } else { + hour = strconv.Itoa(h) + } + m := t.Minute() + var minute string + if m < 10 { + minute = "0" + strconv.Itoa(m) + } else { + minute = strconv.Itoa(m) + } + date := year + "-" + month + "-" + day + "-" + hour + ":" + minute + folderName := c.MetricsFolderPath + "/" + "metrics" + "/" + date + // generate new metrics folder + if _, err := os.Stat(folderName); os.IsNotExist(err) { + c.Log.Warnf("making folder:", folderName) + os.Mkdir(folderName, 0770) + } + // complete file path + c.RawFilePath = folderName + "/" + "gossip-metrics.json" + c.CustomMetricsPath = folderName + "/" + "custom-metrics.json" + c.PeerstorePath = folderName + "/" + "peerstore.json" + c.CsvPath = folderName + "/" + "metrics.csv" + c.Log.Warnf("New exporting folder:", folderName) + + // Update the last checkpoint + err := GenCheckpointFile(c.MetricsFolderPath, date) + if err != nil { + c.Log.Warn(err) + fmt.Println(err) + } +} + +func (c *TopicExportMetricsCmd) ExportSecuence(start time.Time, cm *custom.CustomMetrics) { + // Export The metrics + fmt.Println("exporting metrics") + c.GossipMetrics.FillMetrics(c.Store) + err := c.GossipMetrics.ExportMetrics(c.RawFilePath, c.PeerstorePath, c.CsvPath, c.Store) + if err != nil { + c.Log.Infof("Problems exporting the Metrics to the given file path") + } else { + ed := time.Since(start) + log := "Metrics Exported, time to export:" + ed.String() + c.Log.Infof(log) + } + // Export the Custom metrics + h, _ := c.Host() + // Exporting the CustomMetrics for last time (still don't know which is the best place where to put this call) + err = FilCustomMetrics(c.GossipMetrics, c.Store, cm, h) + if err != nil { + c.Log.Warn(err) + } + // export the CustomMetrics into a json + err = cm.ExportJson(c.CustomMetricsPath) + if err != nil { + c.Log.Warn(err) + } +} + +// Function that writes in a file the folder name of the last checkpoint generated in the project +// DOUBT: Write path relative or absolute? dunno +func GenCheckpointFile(cpPath string, lastCP string) error { + cp := metrics.Checkpoint{ + Checkpoint: lastCP, + } + cpFile := cpPath + "/metrics/checkpoint-folder.json" + fmt.Println("Checkpoint File:", cpFile) + jb, err := json.Marshal(cp) + if err != nil { + fmt.Println("Error Marshalling last Checkpoint") + return err + } + err = ioutil.WriteFile(cpFile, jb, 0644) + if err != nil { + fmt.Println("Error opening file: ", cpFile) + return err + } + return nil } diff --git a/src/control/actor/gossip/topic/log.go b/src/control/actor/gossip/topic/log.go index 58bddfe..21b055d 100644 --- a/src/control/actor/gossip/topic/log.go +++ b/src/control/actor/gossip/topic/log.go @@ -9,8 +9,8 @@ import ( "time" "github.com/golang/snappy" - pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p-core/peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/protolambda/rumor/control/actor/base" "github.com/protolambda/rumor/metrics" "github.com/protolambda/rumor/p2p/gossip" diff --git a/src/control/actor/host/host.go b/src/control/actor/host/host.go index ebd203c..2e38622 100644 --- a/src/control/actor/host/host.go +++ b/src/control/actor/host/host.go @@ -4,6 +4,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/protolambda/ask" "github.com/protolambda/rumor/control/actor/base" + "github.com/protolambda/rumor/control/actor/peer/metadata" "github.com/protolambda/rumor/metrics" "github.com/protolambda/rumor/p2p/track" ) @@ -22,7 +23,8 @@ type HostCmd struct { GlobalPeerstores track.Peerstores CurrentPeerstore track.DynamicPeerstore - GossipMetrics *metrics.GossipMetrics + PeerMetadataState *metadata.PeerMetadataState + GossipMetrics *metrics.GossipMetrics WithSetHost WithCloseHost @@ -42,7 +44,7 @@ func (c *HostCmd) Cmd(route string) (cmd interface{}, err error) { case "listen": cmd = &HostListenCmd{Base: c.Base, WithEnrNode: c.WithEnrNode} case "notify": - cmd = &HostNotifyCmd{Base: c.Base, GossipMetrics: c.GossipMetrics} + cmd = &HostNotifyCmd{Base: c.Base, GossipMetrics: c.GossipMetrics, Store: c.CurrentPeerstore, PeerMetadataState: c.PeerMetadataState} default: return nil, ask.UnrecognizedErr } diff --git a/src/control/actor/host/notify.go b/src/control/actor/host/notify.go index b4d42f2..132c9c7 100644 --- a/src/control/actor/host/notify.go +++ b/src/control/actor/host/notify.go @@ -8,13 +8,17 @@ import ( "github.com/libp2p/go-libp2p-core/network" ma "github.com/multiformats/go-multiaddr" "github.com/protolambda/rumor/control/actor/base" + "github.com/protolambda/rumor/control/actor/peer/metadata" "github.com/protolambda/rumor/metrics" + "github.com/protolambda/rumor/p2p/track" "github.com/sirupsen/logrus" ) type HostNotifyCmd struct { *base.Base *metrics.GossipMetrics + *metadata.PeerMetadataState + Store track.ExtendedPeerstore } func (c *HostNotifyCmd) Help() string { @@ -49,8 +53,10 @@ func (c *HostNotifyCmd) listenCloseF(net network.Network, addr ma.Multiaddr) { } func (c *HostNotifyCmd) connectedF(net network.Network, conn network.Conn) { - c.GossipMetrics.AddNewPeer(conn.RemotePeer()) + _ = c.GossipMetrics.AddNewPeer(conn.RemotePeer()) c.GossipMetrics.AddConnectionEvent(conn.RemotePeer(), "Connection") + // request metadata as soon as we connect to a peer + metrics.PollPeerMetadata(conn.RemotePeer(), c.Base, c.PeerMetadataState, c.Store, c.GossipMetrics) // End of metric traces to track the connections and disconnections c.Log.WithFields(logrus.Fields{ diff --git a/src/control/actor/peer/add.go b/src/control/actor/peer/add.go index 46cf893..755f133 100644 --- a/src/control/actor/peer/add.go +++ b/src/control/actor/peer/add.go @@ -4,11 +4,12 @@ import ( "context" "errors" "fmt" + "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/protolambda/rumor/control/actor/base" "github.com/protolambda/rumor/control/actor/flags" "github.com/protolambda/rumor/p2p/track" - "time" ) type PeerAddCmd struct { diff --git a/src/control/actor/peer/connectrandom.go b/src/control/actor/peer/connectrandom.go index 6098e21..2c70fe5 100644 --- a/src/control/actor/peer/connectrandom.go +++ b/src/control/actor/peer/connectrandom.go @@ -2,6 +2,7 @@ package peer import ( "context" + "fmt" "math/rand" "time" @@ -29,8 +30,8 @@ type PeerConnectRandomCmd struct { func (c *PeerConnectRandomCmd) Default() { c.Timeout = 15 * time.Second - c.Rescan = 1 * time.Minute - c.MaxRetries = 5 + c.Rescan = 10 * time.Minute + c.MaxRetries = 3 c.FilterPort = -1 } @@ -44,7 +45,7 @@ func (c *PeerConnectRandomCmd) Run(ctx context.Context, args ...string) error { if err != nil { return err } - + fmt.Println("Peerstore Rescan Every:", c.Rescan) bgCtx, bgCancel := context.WithCancel(context.Background()) done := make(chan struct{}) go func() { @@ -55,6 +56,7 @@ func (c *PeerConnectRandomCmd) Run(ctx context.Context, args ...string) error { c.Control.RegisterStop(func(ctx context.Context) error { bgCancel() c.Log.Infof("Stopped auto-connecting") + fmt.Println("Stop Autoconnected") <-done return nil }) @@ -78,83 +80,90 @@ func (c *PeerConnectRandomCmd) run(ctx context.Context, h host.Host) { defer close(quit) // set up the loop where every given time we will stop it to refresh the peerstore go func() { + loopCount := 0 + peerCache := make(map[peer.ID]bool) for quit != nil { - reset := make(chan struct{}) - go func() { + if loopCount >= 10 { + fmt.Println("Reseting cache") // generate a "cache of peers in this raw" - peerCache := make(map[peer.ID]bool) - // make the first copy of the peerstore - p := h.Peerstore() - peerList := p.Peers() - c.Log.Infof("the peerstore has been re-scanned") - peerstoreLen := len(peerList) - c.Log.Infof("len peerlist: %s", peerstoreLen) - // loop to attempt connetions for the given time - for reset != nil { - p := randomPeer(peerList) - // loop until we arrive to a peer that we didn't connect before - exists := c.GossipMetrics.ExtraMetrics.AddNewPeer(p) - if exists == true { - connected := c.GossipMetrics.ExtraMetrics.CheckIdConnected(p) - if connected == true { - continue - } else if len(peerCache) == peerstoreLen { - return // Temporary commented - } else { // if we didn't crawl all the peers in the peerstore, don't loose time trying to reconnect failed peers in the past - continue - } - } - // add peer to the peerCache for this round - peerCache[p] = true - // Set the correct address format to connect the peers - // libp2p complains if we put multi-addresses that include the peer ID into the Addrs list. - addrs := c.Store.Addrs(p) - addrInfo := peer.AddrInfo{ - ID: p, - Addrs: make([]ma.Multiaddr, 0, len(addrs)), + peerCache = make(map[peer.ID]bool) + loopCount = 0 + } + // make the first copy of the peerstore + p := h.Peerstore() + peerList := p.Peers() + c.Log.Infof("the peerstore has been re-scanned") + peerstoreLen := len(peerList) + c.Log.Infof("len peerlist: %s", peerstoreLen) + fmt.Println("Peerstore Re-Scanned with", peerstoreLen, "peers") + t := time.Now() + // loop to attempt connetions for the given time + tgap := time.Since(t) + for tgap < c.Rescan { + p := randomPeer(peerList) + // loop until we arrive to a peer that we didn't connect before + _ = c.GossipMetrics.AddNewPeer(p) + _, ok := peerCache[p] + if ok { + if len(peerCache) == peerstoreLen { + break // Temporary commented } - for _, m := range addrs { - transport, _ := peer.SplitAddr(m) - if transport == nil { - continue - } - addrInfo.Addrs = append(addrInfo.Addrs, transport) + continue + } + // add peer to the peerCache for this round + peerCache[p] = true + // Set the correct address format to connect the peers + // libp2p complains if we put multi-addresses that include the peer ID into the Addrs list. + addrs := c.Store.Addrs(p) + addrInfo := peer.AddrInfo{ + ID: p, + Addrs: make([]ma.Multiaddr, 0, len(addrs)), + } + for _, m := range addrs { + transport, _ := peer.SplitAddr(m) + if transport == nil { + continue } - ctx, _ := context.WithTimeout(ctx, c.Timeout) - c.Log.Warnf("addrs %s attempting connection to peer", addrInfo.Addrs) - // try to connect the peer - attempts := 0 - for attempts <= c.MaxRetries { - if err := h.Connect(ctx, addrInfo); err != nil { - // the connetion failed - attempts += 1 - c.GossipMetrics.ExtraMetrics.AddNewAttempt(p, false, err.Error()) - c.Log.WithError(err).Warnf("attempts %d failed connection attempt", attempts) - } else { // connection successfuly made - c.Log.Infof("peer_id %s successful connection made", p) - c.GossipMetrics.ExtraMetrics.AddNewAttempt(p, true, "None") - // break the loop - break - } - if attempts > c.MaxRetries { - c.Log.Warnf("attempts %d failed connection attempt, reached maximum, no retry", attempts) - } + addrInfo.Addrs = append(addrInfo.Addrs, transport) + } + ctx, _ := context.WithTimeout(ctx, c.Timeout) + c.Log.Warnf("addrs %s attempting connection to peer", addrInfo.Addrs) + // try to connect the peer + attempts := 0 + for attempts <= c.MaxRetries { + if err := h.Connect(ctx, addrInfo); err != nil { + // the connetion failed + attempts += 1 + c.GossipMetrics.AddNewConnectionAttempt(p, false, err.Error()) + c.Log.WithError(err).Warnf("attempts %d failed connection attempt", attempts) + continue + } else { // connection successfuly made + c.Log.Infof("peer_id %s successful connection made", p) + c.GossipMetrics.AddNewConnectionAttempt(p, true, "None") + // break the loop + break } - // if the reset flag is active, kill the go-routine - if reset == nil { - c.Log.Infof("Channel reset has been closed") + if attempts > c.MaxRetries { + c.Log.Warnf("attempts %d failed connection attempt, reached maximum, no retry", attempts) + break } - } - }() - time.Sleep(c.Rescan) - close(reset) + tgap = time.Since(t) + + } + + fmt.Println("Restarting the peering") + fmt.Println("Peer attempted from the last reset:", len(peerCache)) // Check if we have received any quit signal if quit == nil { c.Log.Infof("Channel Quit has been closed") + fmt.Println("Quit has been closed") + break } + loopCount += 1 } c.Log.Infof("Go routine to randomly connect has been canceled") + fmt.Println("Go routine to randomly connect has been canceled") }() } diff --git a/src/control/actor/peer/metadata/follow.go b/src/control/actor/peer/metadata/follow.go index d94cf20..d7dcef9 100644 --- a/src/control/actor/peer/metadata/follow.go +++ b/src/control/actor/peer/metadata/follow.go @@ -2,6 +2,7 @@ package metadata import ( "context" + "github.com/protolambda/rumor/control/actor/base" "github.com/sirupsen/logrus" ) diff --git a/src/control/actor/peer/metadata/get.go b/src/control/actor/peer/metadata/get.go index cd6f335..b7f8680 100644 --- a/src/control/actor/peer/metadata/get.go +++ b/src/control/actor/peer/metadata/get.go @@ -2,6 +2,7 @@ package metadata import ( "context" + "github.com/protolambda/rumor/control/actor/base" "github.com/sirupsen/logrus" ) diff --git a/src/control/actor/peer/metadata/metadata.go b/src/control/actor/peer/metadata/metadata.go index 782a484..8d96c57 100644 --- a/src/control/actor/peer/metadata/metadata.go +++ b/src/control/actor/peer/metadata/metadata.go @@ -3,6 +3,7 @@ package metadata import ( "context" "errors" + "github.com/libp2p/go-libp2p-core/peer" "github.com/protolambda/ask" "github.com/protolambda/rumor/control/actor/base" diff --git a/src/control/actor/peer/metadata/ping.go b/src/control/actor/peer/metadata/ping.go index 43915ad..413dccc 100644 --- a/src/control/actor/peer/metadata/ping.go +++ b/src/control/actor/peer/metadata/ping.go @@ -3,6 +3,8 @@ package metadata import ( "context" "fmt" + "time" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -12,7 +14,6 @@ import ( "github.com/protolambda/rumor/p2p/track" "github.com/protolambda/zrnt/eth2/beacon" "github.com/sirupsen/logrus" - "time" ) type PeerMetadataPingCmd struct { diff --git a/src/control/actor/peer/metadata/poll.go b/src/control/actor/peer/metadata/poll.go index 8968179..fef2a86 100644 --- a/src/control/actor/peer/metadata/poll.go +++ b/src/control/actor/peer/metadata/poll.go @@ -2,13 +2,14 @@ package metadata import ( "context" + "sync" + "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/protolambda/rumor/control/actor/base" "github.com/protolambda/rumor/control/actor/flags" "github.com/protolambda/rumor/p2p/rpc/reqresp" "github.com/protolambda/rumor/p2p/track" - "sync" - "time" ) type PeerMetadataPollCmd struct { diff --git a/src/control/actor/peer/metadata/pong.go b/src/control/actor/peer/metadata/pong.go index 2519b07..15df411 100644 --- a/src/control/actor/peer/metadata/pong.go +++ b/src/control/actor/peer/metadata/pong.go @@ -2,6 +2,8 @@ package metadata import ( "context" + "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" "github.com/protolambda/rumor/control/actor/base" @@ -10,7 +12,6 @@ import ( "github.com/protolambda/rumor/p2p/rpc/reqresp" "github.com/protolambda/rumor/p2p/track" "github.com/protolambda/zrnt/eth2/beacon" - "time" ) type PeerMetadataPongCmd struct { diff --git a/src/control/actor/peer/metadata/req.go b/src/control/actor/peer/metadata/req.go index 298d40d..bdd8712 100644 --- a/src/control/actor/peer/metadata/req.go +++ b/src/control/actor/peer/metadata/req.go @@ -3,12 +3,13 @@ package metadata import ( "context" "fmt" + "time" + "github.com/protolambda/rumor/control/actor/base" "github.com/protolambda/rumor/control/actor/flags" "github.com/protolambda/rumor/p2p/rpc/reqresp" "github.com/protolambda/rumor/p2p/track" "github.com/sirupsen/logrus" - "time" ) type PeerMetadataReqCmd struct { diff --git a/src/control/actor/peer/metadata/serve.go b/src/control/actor/peer/metadata/serve.go index 8d430b1..8e6578a 100644 --- a/src/control/actor/peer/metadata/serve.go +++ b/src/control/actor/peer/metadata/serve.go @@ -2,13 +2,14 @@ package metadata import ( "context" + "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" "github.com/protolambda/rumor/control/actor/base" "github.com/protolambda/rumor/control/actor/flags" "github.com/protolambda/rumor/p2p/rpc/methods" "github.com/protolambda/rumor/p2p/rpc/reqresp" - "time" ) type PeerMetadataServeCmd struct { diff --git a/src/control/actor/peer/metadata/set.go b/src/control/actor/peer/metadata/set.go index 7d08706..91d07b2 100644 --- a/src/control/actor/peer/metadata/set.go +++ b/src/control/actor/peer/metadata/set.go @@ -2,6 +2,7 @@ package metadata import ( "context" + "github.com/protolambda/rumor/control/actor/base" "github.com/protolambda/zrnt/eth2/beacon" "github.com/sirupsen/logrus" diff --git a/src/control/session.go b/src/control/session.go index 80e09b1..ea83d95 100644 --- a/src/control/session.go +++ b/src/control/session.go @@ -5,12 +5,7 @@ import ( "encoding/json" "errors" "fmt" - "github.com/protolambda/rumor/control/actor" - "github.com/sirupsen/logrus" "io" - "mvdan.cc/sh/v3/expand" - "mvdan.cc/sh/v3/interp" - "mvdan.cc/sh/v3/syntax" "os" "path" "path/filepath" @@ -19,6 +14,12 @@ import ( "sync" "sync/atomic" "time" + + "github.com/protolambda/rumor/control/actor" + "github.com/sirupsen/logrus" + "mvdan.cc/sh/v3/expand" + "mvdan.cc/sh/v3/interp" + "mvdan.cc/sh/v3/syntax" ) type EnvGlobal interface { diff --git a/src/crawler/crawler.rumor b/src/crawler/crawler.rumor index 5d7662b..2f6e87f 100644 --- a/src/crawler/crawler.rumor +++ b/src/crawler/crawler.rumor @@ -50,6 +50,8 @@ armiarma: include dv5.rumor armiarma: include export_metrics.rumor +#sleep 30s + #armiarma: include connectall.rumor armiarma: include random-connect.rumor diff --git a/src/crawler/export_metrics.rumor b/src/crawler/export_metrics.rumor index 5f31099..5b656d6 100644 --- a/src/crawler/export_metrics.rumor +++ b/src/crawler/export_metrics.rumor @@ -10,14 +10,5 @@ else mkdir "${metricsFolder}/metrics" fi -# Set the names of the files that will be pased as arg to the command -filepath="${metricsFolder}/metrics/gossip-metrics.json" -peerstorepath="${metricsFolder}/metrics/peerstore.json" -customMetricsPath="${metricsFolder}/metrics/custom-metrics.json" -csvpath="${metricsFolder}/metrics/metrics.csv" -extraMetricsPath="${metricsFolder}/metrics/extra-metrics.csv" -echo "gossip metrics file: $filepath" -echo "peerstore file: $peerstorepath" - -gossip topic export-metrics --file-path="$filepath" --peerstore-path="$peerstorepath" --custom-metrics-path="$customMetricsPath" --csv-file="$csvpath" --extra-metrics-path="$extraMetricsPath" --export-period=10s +gossip topic export-metrics --metrics-folder-path="$metricsFolder" --export-period=5m --backup-period=1m \ No newline at end of file diff --git a/src/crawler/polling.rumor b/src/crawler/polling.rumor index f0677d1..b8b1845 100644 --- a/src/crawler/polling.rumor +++ b/src/crawler/polling.rumor @@ -7,8 +7,8 @@ peer status poll --interval=12s --timeout=3s --compression=snappy peer status poll --interval=12s --timeout=3s --compression=none # Ping everyone every 30s, and update metadata if they have new data for us -peer metadata poll --interval=30s --timeout=10s --update=true --update-timeout=20s --compression=snappy -peer metadata poll --interval=30s --timeout=10s --update=true --update-timeout=20s --compression=none +peer metadata poll --interval=15s --timeout=5s --update=true --update-timeout=10s --compression=snappy +peer metadata poll --interval=15s --timeout=5s --update=true --update-timeout=10s --compression=none # Serve pong requests, also update if we get a higher ping than known peer metadata pong --update=true --compression=snappy diff --git a/src/delete b/src/delete deleted file mode 100755 index 8fa5856..0000000 Binary files a/src/delete and /dev/null differ diff --git a/src/main.go b/src/main.go index 84d5dc8..e9dfbd8 100644 --- a/src/main.go +++ b/src/main.go @@ -2,13 +2,21 @@ package main import ( "fmt" + "log" + "os" + "github.com/protolambda/rumor/sh" "github.com/spf13/cobra" - "os" + + "net/http" + _ "net/http/pprof" ) func main() { - + // For memmory profiling + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() mainCmd := cobra.Command{ Use: "rumor", Short: "Start Rumor", diff --git a/src/metrics/custom/metadata-requested.go b/src/metrics/custom/metadata-requested.go new file mode 100644 index 0000000..314e429 --- /dev/null +++ b/src/metrics/custom/metadata-requested.go @@ -0,0 +1,28 @@ +package custom + +type MetadataRequested struct { + Total int + Lighthouse Client + Teku Client + Nimbus Client + Prysm Client + Lodestar Client + Unknown Client +} + +func NewMetadataRequesed() MetadataRequested { + mtreq := MetadataRequested{ + Total: 0, + Lighthouse: NewClient(), + Teku: NewClient(), + Nimbus: NewClient(), + Prysm: NewClient(), + Lodestar: NewClient(), + Unknown: NewClient(), + } + return mtreq +} + +func (mtreq *MetadataRequested) SetTotal(v int) { + mtreq.Total = v +} diff --git a/src/metrics/custom/peerstore.go b/src/metrics/custom/peerstore.go index 66f4c63..fe22e19 100644 --- a/src/metrics/custom/peerstore.go +++ b/src/metrics/custom/peerstore.go @@ -1,26 +1,26 @@ package custom -import () - type PeerStore struct { - Total int - Port13000 int - Port9000 int - PortDiff int + Total int + Port13000 int + Port9000 int + PortDiff int NoAttemptedConnection int - ConnectionFailed ConnectionFailed - ConnectionSucceed ConnectionSucceed + ConnectionFailed ConnectionFailed + ConnectionSucceed ConnectionSucceed + MetadataRequested MetadataRequested } func NewPeerStore() PeerStore { - ps := PeerStore { - Total: 0, - Port13000: 0, - Port9000: 0, - PortDiff: 0, + ps := PeerStore{ + Total: 0, + Port13000: 0, + Port9000: 0, + PortDiff: 0, NoAttemptedConnection: 0, - ConnectionFailed: NewConnectionFailed(), - ConnectionSucceed: NewConnectionSucceed(), + ConnectionFailed: NewConnectionFailed(), + ConnectionSucceed: NewConnectionSucceed(), + MetadataRequested: NewMetadataRequesed(), } return ps } @@ -44,4 +44,3 @@ func (ps *PeerStore) SetPortDiff(t int) { func (ps *PeerStore) SetNotAttempted(t int) { ps.NoAttemptedConnection = t } - diff --git a/src/metrics/custom/utils.go b/src/metrics/custom/utils.go index c960208..efbc04b 100644 --- a/src/metrics/custom/utils.go +++ b/src/metrics/custom/utils.go @@ -1,7 +1,6 @@ package custom import ( - "fmt" "strings" "github.com/libp2p/go-libp2p-core/host" @@ -10,17 +9,17 @@ import ( // ---- Ports and Peers in Peerstore ---- -// Read the address for each of the peers in the peerstore +// Read the address for each of the peers in the peerstore // counting the number of peers (total), peers with ports 13000, 9000 or others func TotalPeers(h host.Host) int { p := h.Peerstore() peerList := p.Peers() return len(peerList) -} +} // returns number of peers with ports exposed in address // (0) -> 13000 | (1) -> 9000 | (2) -> Others -func GetPeersWithPorts(h host.Host, ep track.ExtendedPeerstore) (int, int, int){ +func GetPeersWithPorts(h host.Host, ep track.ExtendedPeerstore) (int, int, int) { x := 0 // port 13000 y := 0 // port 9000 z := 0 // other ports @@ -37,10 +36,5 @@ func GetPeersWithPorts(h host.Host, ep track.ExtendedPeerstore) (int, int, int){ } } } - w := x + y + z - if w != len(peerList) { - fmt.Println("Peerstore at host and ExtendedPeerstore don't match", w, len(peerList)) - } return x, y, z } - diff --git a/src/metrics/address.go b/src/metrics/export/address.go similarity index 91% rename from src/metrics/address.go rename to src/metrics/export/address.go index fb4c24e..63766c6 100644 --- a/src/metrics/address.go +++ b/src/metrics/export/address.go @@ -1,4 +1,4 @@ -package metrics +package export type AddressList []string diff --git a/src/metrics/export/attempted.go b/src/metrics/export/attempted.go new file mode 100644 index 0000000..6a44be5 --- /dev/null +++ b/src/metrics/export/attempted.go @@ -0,0 +1,22 @@ +package export + +type AttemptedList []bool + +// add new item to the list +func (l *AttemptedList) AddItem(newItem bool) { + *l = append(*l, newItem) +} + +// get item form the list by index +func (l *AttemptedList) GetByIndex(idx int) bool { + return (*l)[idx] +} + +// get the array sorted by list of indexes +func (l AttemptedList) GetArrayByIndexes(idxs []int) []bool { + var sortedArray []bool + for _, i := range idxs { + sortedArray = append(sortedArray, l[i]) + } + return sortedArray +} diff --git a/src/metrics/export/attempts.go b/src/metrics/export/attempts.go new file mode 100644 index 0000000..3d330e2 --- /dev/null +++ b/src/metrics/export/attempts.go @@ -0,0 +1,22 @@ +package export + +type AttemptsList []int + +// add new item to the list +func (l *AttemptsList) AddItem(newItem int) { + *l = append(*l, newItem) +} + +// get item form the list by index +func (l *AttemptsList) GetByIndex(idx int) int { + return (*l)[idx] +} + +// get the array sorted by list of indexes +func (l AttemptsList) GetArrayByIndexes(idxs []int) []int { + var sortedArray []int + for _, i := range idxs { + sortedArray = append(sortedArray, l[i]) + } + return sortedArray +} diff --git a/src/metrics/city.go b/src/metrics/export/city.go similarity index 91% rename from src/metrics/city.go rename to src/metrics/export/city.go index 454c301..700167e 100644 --- a/src/metrics/city.go +++ b/src/metrics/export/city.go @@ -1,4 +1,4 @@ -package metrics +package export type CityList []string diff --git a/src/metrics/client_type.go b/src/metrics/export/client_type.go similarity index 96% rename from src/metrics/client_type.go rename to src/metrics/export/client_type.go index 3739f41..c354093 100644 --- a/src/metrics/client_type.go +++ b/src/metrics/export/client_type.go @@ -1,4 +1,4 @@ -package metrics +package export type ClientTypeList []string diff --git a/src/metrics/client_version.go b/src/metrics/export/client_version.go similarity index 96% rename from src/metrics/client_version.go rename to src/metrics/export/client_version.go index 9bfd2ff..b68ddcd 100644 --- a/src/metrics/client_version.go +++ b/src/metrics/export/client_version.go @@ -1,4 +1,4 @@ -package metrics +package export type ClientVersionList []string @@ -19,4 +19,4 @@ func (cv ClientVersionList) GetArrayByIndexes(idxs []int) []string { sortedArray = append(sortedArray, cv[i]) } return sortedArray -} \ No newline at end of file +} diff --git a/src/metrics/export/connected.go b/src/metrics/export/connected.go new file mode 100644 index 0000000..9f1eb07 --- /dev/null +++ b/src/metrics/export/connected.go @@ -0,0 +1,22 @@ +package export + +type ConnectedList []bool + +// add new item to the list +func (l *ConnectedList) AddItem(newItem bool) { + *l = append(*l, newItem) +} + +// get item form the list by index +func (l *ConnectedList) GetByIndex(idx int) bool { + return (*l)[idx] +} + +// get the array sorted by list of indexes +func (l ConnectedList) GetArrayByIndexes(idxs []int) []bool { + var sortedArray []bool + for _, i := range idxs { + sortedArray = append(sortedArray, l[i]) + } + return sortedArray +} diff --git a/src/metrics/connected_time.go b/src/metrics/export/connected_time.go similarity index 94% rename from src/metrics/connected_time.go rename to src/metrics/export/connected_time.go index d9f77fd..9cf604c 100644 --- a/src/metrics/connected_time.go +++ b/src/metrics/export/connected_time.go @@ -1,4 +1,4 @@ -package metrics +package export type ConnectedTimeList []float64 // in minutes diff --git a/src/metrics/connection.go b/src/metrics/export/connection.go similarity index 93% rename from src/metrics/connection.go rename to src/metrics/export/connection.go index 9b2b544..f9da1bb 100644 --- a/src/metrics/connection.go +++ b/src/metrics/export/connection.go @@ -1,4 +1,4 @@ -package metrics +package export type ConnectionList []int64 diff --git a/src/metrics/country.go b/src/metrics/export/country.go similarity index 93% rename from src/metrics/country.go rename to src/metrics/export/country.go index 18d9d3f..e34c036 100644 --- a/src/metrics/country.go +++ b/src/metrics/export/country.go @@ -1,4 +1,4 @@ -package metrics +package export type CountryList []string diff --git a/src/metrics/dataframe.go b/src/metrics/export/dataframe.go similarity index 64% rename from src/metrics/dataframe.go rename to src/metrics/export/dataframe.go index 2826d8b..2e94c64 100644 --- a/src/metrics/dataframe.go +++ b/src/metrics/export/dataframe.go @@ -1,13 +1,14 @@ -package metrics +package export import ( "fmt" "os" + "sort" "strconv" "sync" - "sort" - "github.com/protolambda/rumor/metrics/custom" + "github.com/protolambda/rumor/metrics/custom" + "github.com/protolambda/rumor/metrics/utils" ) // Main Data Structure that will be used to analyze and plot the metrics @@ -25,6 +26,17 @@ type MetricsDataFrame struct { Cities CityList Latencies LatencyList + // Connection Related + Attempted AttemptedList + Succeed SucceedList + Connected ConnectedList + Attempts AttemptsList + Errors ErrorList + + // Metadata Related + RequestedMetadata RequestedMetadataList + SuccessMetadata SuccessMetadataList + // Metrics Related Connections ConnectionList Disconnections DisconnectionList @@ -48,12 +60,12 @@ func NewMetricsDataFrame(metricsCopy sync.Map) *MetricsDataFrame { // Initialize the DataFrame with the expoting time mdf := &MetricsDataFrame{ Len: 0, - ExportTime: GetTimeMiliseconds(), + ExportTime: utils.GetTimeMiliseconds(), } // Generate the loop over each peer of the Metrics metricsCopy.Range(func(k, val interface{}) bool { - var v PeerMetrics - v = val.(PeerMetrics) + var v utils.PeerMetrics + v = val.(utils.PeerMetrics) mdf.PeerIds.AddItem(v.PeerId) mdf.NodeIds.AddItem(v.NodeId) mdf.UserAgent.AddItem(v.ClientType) @@ -67,6 +79,15 @@ func NewMetricsDataFrame(metricsCopy sync.Map) *MetricsDataFrame { mdf.Countries.AddItem(v.Country) mdf.Cities.AddItem(v.City) mdf.Latencies.AddItem(v.Latency) // in milliseconds + // Add connection status to the CSV + mdf.Attempted.AddItem(v.Attempted) + mdf.Succeed.AddItem(v.Succeed) + mdf.Connected.AddItem(v.Connected) + mdf.Attempts.AddItem(v.Attempts) + mdf.Errors.AddItem(v.Error) + // Metadata infomation + mdf.RequestedMetadata.AddItem(v.MetadataRequest) + mdf.SuccessMetadata.AddItem(v.MetadataSucceed) // Analyze the connections from the events connections, disconnections, connTime := AnalyzeConnectionEvents(v.ConnectionEvents, mdf.ExportTime) mdf.Connections.AddItem(connections) @@ -97,7 +118,7 @@ func (mdf MetricsDataFrame) ExportToCSV(filePath string) error { } defer csvFile.Close() // First raw of the file will be the Titles of the columns - _, err = csvFile.WriteString("Peer Id,Node Id,User Agent,Client,Version,Pubkey,Address,Ip,Country,City,Latency,Connections,Disconnections,Connected Time,Beacon Blocks,Beacon Aggregations,Voluntary Exits,Proposer Slashings,Attester Slashings,Total Messages\n") + _, err = csvFile.WriteString("Peer Id,Node Id,User Agent,Client,Version,Pubkey,Address,Ip,Country,City,Request Metadata,Success Metadata,Attempted,Succeed,Connected,Attempts,Error,Latency,Connections,Disconnections,Connected Time,Beacon Blocks,Beacon Aggregations,Voluntary Exits,Proposer Slashings,Attester Slashings,Total Messages\n") if err != nil { return fmt.Errorf("Error while Writing the Titles on the csv") } @@ -106,12 +127,14 @@ func (mdf MetricsDataFrame) ExportToCSV(filePath string) error { var csvRow string // special case for the latency lat := fmt.Sprint(mdf.Latencies.GetByIndex(idx)) - conTime := fmt.Sprintf("%.3f", mdf.ConnectedTimes.GetByIndex(idx)) + conTime := fmt.Sprintf("%.3f", mdf.ConnectedTimes.GetByIndex(idx)) csvRow = mdf.PeerIds.GetByIndex(idx).String() + "," + mdf.NodeIds.GetByIndex(idx) + "," + mdf.UserAgent.GetByIndex(idx) + "," + mdf.ClientTypes.GetByIndex(idx) + "," + mdf.ClientVersions.GetByIndex(idx) + "," + mdf.PubKeys.GetByIndex(idx) + "," + mdf.Addresses.GetByIndex(idx) + "," + mdf.Ips.GetByIndex(idx) + "," + - mdf.Countries.GetByIndex(idx) + "," + mdf.Cities.GetByIndex(idx) + "," + lat + "," + strconv.Itoa(int(mdf.Connections.GetByIndex(idx))) + "," + + mdf.Countries.GetByIndex(idx) + "," + mdf.Cities.GetByIndex(idx) + "," + strconv.FormatBool(mdf.RequestedMetadata.GetByIndex(idx)) + "," + strconv.FormatBool(mdf.SuccessMetadata.GetByIndex(idx)) + "," + strconv.FormatBool(mdf.Attempted.GetByIndex(idx)) + "," + + strconv.FormatBool(mdf.Succeed.GetByIndex(idx)) + "," + strconv.FormatBool(mdf.Connected.GetByIndex(idx)) + "," + strconv.Itoa(mdf.Attempts.GetByIndex(idx)) + "," + mdf.Errors.GetByIndex(idx) + "," + lat + "," + strconv.Itoa(int(mdf.Connections.GetByIndex(idx))) + "," + strconv.Itoa(int(mdf.Disconnections.GetByIndex(idx))) + "," + conTime + "," + strconv.Itoa(int(mdf.RBeaconBlocks.GetByIndex(idx))) + "," + strconv.Itoa(int(mdf.RBeaconAggregations.GetByIndex(idx))) + "," + - strconv.Itoa(int(mdf.RVoluntaryExits.GetByIndex(idx))) + "," + strconv.Itoa(int(mdf.RProposerSlashings.GetByIndex(idx))) + "," + strconv.Itoa(int(mdf.RAttesterSlashings.GetByIndex(idx))) + "," + strconv.Itoa(int(mdf.RTotalMessages.GetByIndex(idx))) + "\n" + strconv.Itoa(int(mdf.RVoluntaryExits.GetByIndex(idx))) + "," + strconv.Itoa(int(mdf.RProposerSlashings.GetByIndex(idx))) + "," + strconv.Itoa(int(mdf.RAttesterSlashings.GetByIndex(idx))) + "," + + strconv.Itoa(int(mdf.RTotalMessages.GetByIndex(idx))) + "\n" _, err = csvFile.WriteString(csvRow) if err != nil { return fmt.Errorf("Error while Writing the Titles on the csv") @@ -126,7 +149,7 @@ func GetMetricsDuplicate(original sync.Map) sync.Map { var newMap sync.Map // Iterate through the items on the original Map original.Range(func(k, v interface{}) bool { - cp, ok := v.(PeerMetrics) + cp, ok := v.(utils.PeerMetrics) if ok { newMap.Store(k, cp) } @@ -135,12 +158,11 @@ func GetMetricsDuplicate(original sync.Map) sync.Map { return newMap } - // Function that iterates through the peers keeping track of the client type, and versions -func (df MetricsDataFrame) AnalyzeClientType(clientname string) custom.Client{ +func (df MetricsDataFrame) AnalyzeClientType(clientname string) custom.Client { client := custom.NewClient() clicnt := 0 - versions := make(map[string]int, 0) + versions := make(map[string]int) // iterate through the peer metrics with reading the client List for idx, item := range df.ClientTypes { @@ -149,7 +171,7 @@ func (df MetricsDataFrame) AnalyzeClientType(clientname string) custom.Client{ // add the version to the map or increase the actual counter ver := df.ClientVersions.GetByIndex(idx) //x := versions[ver] - versions[ver] += 1 + versions[ver] += 1 } } // after reading the entire metrics we can generate the custom.Client struct @@ -166,4 +188,40 @@ func (df MetricsDataFrame) AnalyzeClientType(clientname string) custom.Client{ client.AddVersion(item, versions[item]) } return client -} \ No newline at end of file +} + +// Function that iterates through the peers keeping track of the client type, and versions if the peer was requested the metadata +func (df MetricsDataFrame) AnalyzeClientTypeIfMetadataRequested(clientname string) custom.Client { + client := custom.NewClient() + clicnt := 0 + versions := make(map[string]int) + + // iterate through the peer metrics with reading the client List + for idx, item := range df.ClientTypes { + if item == clientname { // peer with the same client type as the one we are searching for and metadata was requested + // check if the metadata was requested from the peer + i := df.RequestedMetadata.GetByIndex(idx) + if i { + clicnt += 1 + // add the version to the map or increase the actual counter + ver := df.ClientVersions.GetByIndex(idx) + //x := versions[ver] + versions[ver] += 1 + } + } + } + // after reading the entire metrics we can generate the custom.Client struct + client.SetTotal(clicnt) + v := make([]string, 0, len(versions)) + for val := range versions { + v = append(v, val) + } + sort.Strings(v) + for i, j := 0, len(v)-1; i < j; i, j = i+1, j-1 { + v[i], v[j] = v[j], v[i] + } + for _, item := range v { + client.AddVersion(item, versions[item]) + } + return client +} diff --git a/src/metrics/disconnection.go b/src/metrics/export/disconnection.go similarity index 93% rename from src/metrics/disconnection.go rename to src/metrics/export/disconnection.go index 8ff57f0..4a7757b 100644 --- a/src/metrics/disconnection.go +++ b/src/metrics/export/disconnection.go @@ -1,4 +1,4 @@ -package metrics +package export type DisconnectionList []int64 diff --git a/src/metrics/export/errors.go b/src/metrics/export/errors.go new file mode 100644 index 0000000..c775d1b --- /dev/null +++ b/src/metrics/export/errors.go @@ -0,0 +1,22 @@ +package export + +type ErrorList []string + +// add new item to the list +func (l *ErrorList) AddItem(newItem string) { + *l = append(*l, newItem) +} + +// get item form the list by index +func (l *ErrorList) GetByIndex(idx int) string { + return (*l)[idx] +} + +// get the array sorted by list of indexes +func (l ErrorList) GetArrayByIndexes(idxs []int) []string { + var sortedArray []string + for _, i := range idxs { + sortedArray = append(sortedArray, l[i]) + } + return sortedArray +} diff --git a/src/metrics/ip.go b/src/metrics/export/ip.go similarity index 92% rename from src/metrics/ip.go rename to src/metrics/export/ip.go index 00769e5..ce2cf62 100644 --- a/src/metrics/ip.go +++ b/src/metrics/export/ip.go @@ -1,4 +1,4 @@ -package metrics +package export type IpList []string diff --git a/src/metrics/latency.go b/src/metrics/export/latency.go similarity index 93% rename from src/metrics/latency.go rename to src/metrics/export/latency.go index b60379d..0d7218d 100644 --- a/src/metrics/latency.go +++ b/src/metrics/export/latency.go @@ -1,4 +1,4 @@ -package metrics +package export type LatencyList []float64 diff --git a/src/metrics/export/metadata-request.go b/src/metrics/export/metadata-request.go new file mode 100644 index 0000000..a1dc9b6 --- /dev/null +++ b/src/metrics/export/metadata-request.go @@ -0,0 +1,22 @@ +package export + +type RequestedMetadataList []bool + +// add new item to the list +func (l *RequestedMetadataList) AddItem(newItem bool) { + *l = append(*l, newItem) +} + +// get item form the list by index +func (l *RequestedMetadataList) GetByIndex(idx int) bool { + return (*l)[idx] +} + +// get the array sorted by list of indexes +func (l RequestedMetadataList) GetArrayByIndexes(idxs []int) []bool { + var sortedArray []bool + for _, i := range idxs { + sortedArray = append(sortedArray, l[i]) + } + return sortedArray +} diff --git a/src/metrics/node_id.go b/src/metrics/export/node_id.go similarity index 96% rename from src/metrics/node_id.go rename to src/metrics/export/node_id.go index 85b40bf..8769d5c 100644 --- a/src/metrics/node_id.go +++ b/src/metrics/export/node_id.go @@ -1,4 +1,4 @@ -package metrics +package export type NodeIdList []string diff --git a/src/metrics/peer_id.go b/src/metrics/export/peer_id.go similarity index 97% rename from src/metrics/peer_id.go rename to src/metrics/export/peer_id.go index 5b6cebc..3a528c0 100644 --- a/src/metrics/peer_id.go +++ b/src/metrics/export/peer_id.go @@ -1,4 +1,4 @@ -package metrics +package export import ( "github.com/libp2p/go-libp2p-core/peer" diff --git a/src/metrics/pubkey.go b/src/metrics/export/pubkey.go similarity index 93% rename from src/metrics/pubkey.go rename to src/metrics/export/pubkey.go index 3690ee5..ac804da 100644 --- a/src/metrics/pubkey.go +++ b/src/metrics/export/pubkey.go @@ -1,4 +1,4 @@ -package metrics +package export type PubKeyList []string diff --git a/src/metrics/rattester_slashing.go b/src/metrics/export/rattester_slashing.go similarity index 93% rename from src/metrics/rattester_slashing.go rename to src/metrics/export/rattester_slashing.go index 98e3f64..609c4d7 100644 --- a/src/metrics/rattester_slashing.go +++ b/src/metrics/export/rattester_slashing.go @@ -1,4 +1,4 @@ -package metrics +package export type RAttesterSlashingList []int64 diff --git a/src/metrics/rbeacon_aggregation.go b/src/metrics/export/rbeacon_aggregation.go similarity index 93% rename from src/metrics/rbeacon_aggregation.go rename to src/metrics/export/rbeacon_aggregation.go index 767a868..a14036e 100644 --- a/src/metrics/rbeacon_aggregation.go +++ b/src/metrics/export/rbeacon_aggregation.go @@ -1,4 +1,4 @@ -package metrics +package export type RBeaconAggregationList []int64 diff --git a/src/metrics/rbeacon_block.go b/src/metrics/export/rbeacon_block.go similarity index 93% rename from src/metrics/rbeacon_block.go rename to src/metrics/export/rbeacon_block.go index ae68235..1cc08a7 100644 --- a/src/metrics/rbeacon_block.go +++ b/src/metrics/export/rbeacon_block.go @@ -1,4 +1,4 @@ -package metrics +package export type RBeaconBlockList []int64 diff --git a/src/metrics/rproposer_slashing.go b/src/metrics/export/rproposer_slashing.go similarity index 93% rename from src/metrics/rproposer_slashing.go rename to src/metrics/export/rproposer_slashing.go index adfd5bf..ca8e064 100644 --- a/src/metrics/rproposer_slashing.go +++ b/src/metrics/export/rproposer_slashing.go @@ -1,4 +1,4 @@ -package metrics +package export type RProposerSlashingList []int64 diff --git a/src/metrics/rvoluntary_exit.go b/src/metrics/export/rvoluntary_exit.go similarity index 93% rename from src/metrics/rvoluntary_exit.go rename to src/metrics/export/rvoluntary_exit.go index 062c19b..0691108 100644 --- a/src/metrics/rvoluntary_exit.go +++ b/src/metrics/export/rvoluntary_exit.go @@ -1,4 +1,4 @@ -package metrics +package export type RVoluntaryExitList []int64 diff --git a/src/metrics/export/succeed.go b/src/metrics/export/succeed.go new file mode 100644 index 0000000..c25ea21 --- /dev/null +++ b/src/metrics/export/succeed.go @@ -0,0 +1,22 @@ +package export + +type SucceedList []bool + +// add new item to the list +func (l *SucceedList) AddItem(newItem bool) { + *l = append(*l, newItem) +} + +// get item form the list by index +func (l *SucceedList) GetByIndex(idx int) bool { + return (*l)[idx] +} + +// get the array sorted by list of indexes +func (l SucceedList) GetArrayByIndexes(idxs []int) []bool { + var sortedArray []bool + for _, i := range idxs { + sortedArray = append(sortedArray, l[i]) + } + return sortedArray +} diff --git a/src/metrics/export/success-metadata.go b/src/metrics/export/success-metadata.go new file mode 100644 index 0000000..ea3aa52 --- /dev/null +++ b/src/metrics/export/success-metadata.go @@ -0,0 +1,22 @@ +package export + +type SuccessMetadataList []bool + +// add new item to the list +func (l *SuccessMetadataList) AddItem(newItem bool) { + *l = append(*l, newItem) +} + +// get item form the list by index +func (l *SuccessMetadataList) GetByIndex(idx int) bool { + return (*l)[idx] +} + +// get the array sorted by list of indexes +func (l SuccessMetadataList) GetArrayByIndexes(idxs []int) []bool { + var sortedArray []bool + for _, i := range idxs { + sortedArray = append(sortedArray, l[i]) + } + return sortedArray +} diff --git a/src/metrics/total_messages.go b/src/metrics/export/total_messages.go similarity index 94% rename from src/metrics/total_messages.go rename to src/metrics/export/total_messages.go index c43ec73..19355b8 100644 --- a/src/metrics/total_messages.go +++ b/src/metrics/export/total_messages.go @@ -1,4 +1,4 @@ -package metrics +package export type TotalMessagesList []int64 diff --git a/src/metrics/user_agent.go b/src/metrics/export/user_agent.go similarity index 92% rename from src/metrics/user_agent.go rename to src/metrics/export/user_agent.go index aef648e..5f1a279 100644 --- a/src/metrics/user_agent.go +++ b/src/metrics/export/user_agent.go @@ -1,4 +1,4 @@ -package metrics +package export type UserAgentList []string diff --git a/src/metrics/utils.go b/src/metrics/export/utils.go similarity index 94% rename from src/metrics/utils.go rename to src/metrics/export/utils.go index eefecdd..0be33fc 100644 --- a/src/metrics/utils.go +++ b/src/metrics/export/utils.go @@ -1,13 +1,15 @@ -package metrics +package export import ( "strings" + + "github.com/protolambda/rumor/metrics/utils" ) // Connection Utils // filter the received Connection/Disconnection events generating a counter and the connected time -func AnalyzeConnectionEvents(eventList []ConnectionEvents, currentTime int64) (int64, int64, float64) { +func AnalyzeConnectionEvents(eventList []utils.ConnectionEvents, currentTime int64) (int64, int64, float64) { var startingTime int64 = 0 var finishingTime int64 = 0 // aux variables diff --git a/src/metrics/extrametrics.go b/src/metrics/extrametrics.go index 5ee89c8..b377398 100644 --- a/src/metrics/extrametrics.go +++ b/src/metrics/extrametrics.go @@ -2,169 +2,94 @@ package metrics import ( "fmt" - "os" - "strconv" "strings" - "sync" - "encoding/csv" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/protolambda/rumor/metrics/utils" ) -type ExtraMetrics struct { - Peers sync.Map -} - -// ImportExtraMetrics adds the previously experienced behaviour for each of the peers -// in the peerstore -func (em *ExtraMetrics) ImportMetrics(filePath string) (error, bool) { - // Check if file exist - if FileExists(filePath) { // if exists, read it - // get the csv of the file - csvFile, err := os.Open(filePath) - if err != nil { - return err, true - } - defer csvFile.Close() - csvLines, err := csv.NewReader(csvFile).ReadAll() - if err != nil { - fmt.Println(err) - } - for _, line := range csvLines { - id := peer.ID(line[0]) - attempted, _ := strconv.ParseBool(line[1]) - success, _ := strconv.ParseBool(line[2]) - attempts, _ := strconv.Atoi(line[3]) - erro := line[4] - if erro == "Error" { - erro = "Uncertain" - } - pem := PeerExtraMetrics{ - ID: id, - Attempted: attempted, - Succeed: success, - Attempts: attempts, - Error: erro, - } - em.Peers.Store(id, pem) - } - return nil, true - } else { - return fmt.Errorf("Not file %s was found in path", filePath), false +// AddNewAttempts adds the resuts of a new attempt over an existing peer +// increasing the attempt counter and the respective fields +func (gm *GossipMetrics) AddNewConnectionAttempt(id peer.ID, succeed bool, err string) error { + v, ok := gm.GossipMetrics.Load(id) + if !ok { // the peer was already in the sync.Map return true + return fmt.Errorf("Not peer found with that ID %s", id.String()) } -} - + // Update the counter and connection status + p := v.(utils.PeerMetrics) -// AddNewPeer adds a peer struct to the total list giving as a result a bool -// that will be true if the peer was already in the sync.Map (exists?) -func (em *ExtraMetrics) AddNewPeer(id peer.ID) bool { - _, ok := em.Peers.Load(id) - if ok { // the peer was already in the sync.Map return true - return true + if !p.Attempted { + p.Attempted = true + //fmt.Println("Original ", err) + // MIGHT be nice to try if we can change the uncertain errors for the dial backoff + if err != "" || err != "dial backoff" { + p.Error = FilterError(err) + } } - // Generate a new PeerExtraMetrics with the first attempt results - pem := PeerExtraMetrics{ - ID: id, - Attempted: false, - Succeed: false, - Attempts: 0, - Error: "None", + if succeed { + p.Succeed = succeed + p.Error = "None" } - em.Peers.Store(id, pem) - return false + p.Attempts += 1 + + // Store the new struct in the sync.Map + gm.GossipMetrics.Store(id, p) + return nil } // AddNewAttempts adds the resuts of a new attempt over an existing peer // increasing the attempt counter and the respective fields -func (em *ExtraMetrics) AddNewAttempt(id peer.ID, succeed bool, err string) error { - v, ok := em.Peers.Load(id) +func (gm *GossipMetrics) AddNewConnection(id peer.ID) error { + v, ok := gm.GossipMetrics.Load(id) if !ok { // the peer was already in the sync.Map return true return fmt.Errorf("Not peer found with that ID %s", id.String()) } // Update the counter and connection status - pem := v.(PeerExtraMetrics) - pem.NewAttempt(succeed, err) + p := v.(utils.PeerMetrics) + + p.Connected = true + // Store the new struct in the sync.Map - em.Peers.Store(id, pem) + gm.GossipMetrics.Store(id, p) return nil } // CheckIdConnected check if the given peer was already connected // returning true if it was connected before or false if wasn't -func (em *ExtraMetrics) CheckIdConnected(id peer.ID) bool { - v, ok := em.Peers.Load(id) +func (gm *GossipMetrics) CheckIfConnected(id peer.ID) bool { + v, ok := gm.GossipMetrics.Load(id) if !ok { // the peer was already in the sync.Map we didn't connect the peer -> false return false } // Check if the peer was connected - pem := v.(PeerExtraMetrics) - if pem.Succeed == true { + p := v.(utils.PeerMetrics) + if p.Succeed { return true } else { return false } } -// Function that converts the content of the ExtraMetrics struct (content of the sync.Map) -// into a CSV -// TODO: to avoid code duplication, better to generate a new package called "exporter" -// in charge of convert map[] or sync.Map into a csv/json -func (em ExtraMetrics) ExportCSV(filePath string) error { - // Marshall the metrics into a json/csv - tmpMap := make(map[string]PeerExtraMetrics) - em.Peers.Range(func(k, v interface{}) bool { - tmpMap[k.(peer.ID).String()] = v.(PeerExtraMetrics) - return true - }) - // Export the json to the given path/file - // If we want to export it in json - //embytes := json.Marshal(tmpMap) - - // to do it in csv - csvFile, err := os.Create(filePath) // Create, New file, if exist overwrite - if err != nil { - return fmt.Errorf("Error Opening the file:", filePath) - } - defer csvFile.Close() - // First raw of the file will be the Titles of the columns - _, err = csvFile.WriteString("Peer Id,Attempted,Succeed,Attempts,Error\n") - if err != nil { - return fmt.Errorf("Error while Writing the Titles on the csv") - } - - for k, v := range tmpMap { - var csvRow string - peerMetrics := v - csvRow = k + "," + strconv.FormatBool(peerMetrics.Attempted) + "," + - strconv.FormatBool(peerMetrics.Succeed) + "," + strconv.Itoa(peerMetrics.Attempts) + "," + peerMetrics.Error + "\n" - _, err = csvFile.WriteString(csvRow) - if err != nil { - return fmt.Errorf("Error while Writing the Titles on the csv") - } - } - return nil -} - -// GetConnectionsMetrics returns the analysis over the peers found in the -// ExtraMetrics. Return Values = (0)->succeed | (1)->failed | (2)->notattempted -func (em *ExtraMetrics) GetConnectionMetrics(h host.Host) (int, int, int) { +// GetConnectionsMetrics returns the analysis over the peers found in the +// ExtraMetrics. Return Values = (0)->succeed | (1)->failed | (2)->notattempted +func (gm *GossipMetrics) GetConnectionMetrics(h host.Host) (int, int, int) { totalrecorded := 0 - succeed := 0 + succeed := 0 failed := 0 notattempted := 0 // Read from the recorded ExtraMetrics the status of each peer connections - em.Peers.Range(func(key interface{}, value interface{}) bool { - em := value.(PeerExtraMetrics) + gm.GossipMetrics.Range(func(key interface{}, value interface{}) bool { + p := value.(utils.PeerMetrics) totalrecorded += 1 // Catalog each of the peers for the experienced status - if em.Attempted { - if em.Succeed { - succeed += 1 + if p.Attempted { + if p.Succeed { + succeed += 1 } else { failed += 1 } - } else { + } else { notattempted += 1 } return true @@ -173,31 +98,25 @@ func (em *ExtraMetrics) GetConnectionMetrics(h host.Host) (int, int, int) { peerList := h.Peerstore().Peers() peerstoreLen := len(peerList) notattempted = notattempted + (peerstoreLen - totalrecorded) - fmt.Println("Total Peerstore, Total Tracked, Succeed, Failed, Not Attempted") - fmt.Println(peerstoreLen, totalrecorded, succeed, failed, notattempted) - t := (succeed + failed + notattempted) - if t != peerstoreLen { - fmt.Println("Extra Metrics and Peerstore don't match", t, peerstoreLen) - } // MAYBE -> include here the error reader? return succeed, failed, notattempted } -// GetConnectionsMetrics returns the analysis over the peers found in the ExtraMetrics. +// GetConnectionsMetrics returns the analysis over the peers found in the ExtraMetrics. // Return Values = (0)->resetbypeer | (1)->timeout | (2)->dialtoself | (3)->dialbackoff | (4)->uncertain -func (em *ExtraMetrics) GetErrorCounter(h host.Host) (int, int, int, int, int) { +func (gm *GossipMetrics) GetErrorCounter(h host.Host) (int, int, int, int, int) { totalfailed := 0 dialbackoff := 0 - timeout := 0 + timeout := 0 resetbypeer := 0 dialtoself := 0 uncertain := 0 // Read from the recorded ExtraMetrics the status of each peer connections - em.Peers.Range(func(key interface{}, value interface{}) bool { - em := value.(PeerExtraMetrics) + gm.GossipMetrics.Range(func(key interface{}, value interface{}) bool { + p := value.(utils.PeerMetrics) // Catalog each of the peers for the experienced status - if em.Attempted && em.Succeed == false { // atempted and failed should have generated an error - erro := em.Error + if p.Attempted && !p.Succeed { // atempted and failed should have generated an error + erro := p.Error totalfailed += 1 switch erro { case "Connection reset by peer": @@ -207,51 +126,18 @@ func (em *ExtraMetrics) GetErrorCounter(h host.Host) (int, int, int, int, int) { case "dial to self attempted": dialtoself += 1 case "dial backoff": - dialbackoff += 1 + dialbackoff += 1 case "Uncertain": uncertain += 1 default: fmt.Println("The recorded error type doesn't match any of the error on the list", erro) } - } + } return true }) - fmt.Println("totalerrors, resetbypeer, timeout, dialtoself, dialbackoff, uncertain") - fmt.Println(totalfailed, resetbypeer, timeout, dialtoself, dialbackoff, uncertain) return resetbypeer, timeout, dialtoself, dialbackoff, uncertain } - - - - - -type PeerExtraMetrics struct { - ID peer.ID // ID of the peer - Attempted bool // If the peer has been attempted to stablish a connection - Succeed bool // If the connection attempt has been successful - Attempts int // Number of attempts done - Error string // Type of error that we detected -} - -// Funtion that updates the values of the new connection trial increasing the counter -// as the result of the connection trial -func (p *PeerExtraMetrics) NewAttempt(success bool, err string) { - if p.Attempted == false { - p.Attempted = true - //fmt.Println("Original ", err) - // MIGHT be nice to try if we can change the uncertain errors for the dial backoff - if err != "" || err != "dial backoff" { - p.Error = FilterError(err) - } - } - if success == true { - p.Succeed = success - p.Error = "None" - } - p.Attempts += 1 -} - // funtion that formats the error into a Pretty understandable (standard) way // also important to cohesionate the extra-metrics output csv func FilterError(err string) string { diff --git a/src/metrics/gossip_metrics.go b/src/metrics/gossip_metrics.go index 073cc48..de31759 100644 --- a/src/metrics/gossip_metrics.go +++ b/src/metrics/gossip_metrics.go @@ -14,22 +14,22 @@ import ( "time" "github.com/libp2p/go-libp2p-core/peer" + "github.com/protolambda/rumor/metrics/export" + "github.com/protolambda/rumor/metrics/utils" pgossip "github.com/protolambda/rumor/p2p/gossip" "github.com/protolambda/rumor/p2p/gossip/database" "github.com/protolambda/rumor/p2p/track" - // "github.com/protolambda/zrnt/eth2/beacon" ) type GossipMetrics struct { GossipMetrics sync.Map - ExtraMetrics ExtraMetrics MessageDatabase *database.MessageDatabase StartTime int64 // milliseconds } func NewGossipMetrics() GossipMetrics { gm := GossipMetrics{ - StartTime: GetTimeMiliseconds(), + StartTime: utils.GetTimeMiliseconds(), } return gm } @@ -44,29 +44,54 @@ func FileExists(name string) bool { return true } +type Checkpoint struct { + Checkpoint string `json:"checkpoint"` +} + // Import an old GossipMetrics from given file // return: - return error if there was error while reading the file // - return bool for existing file (true if there was a file to read, return false if there wasn't a file to read) -func (c *GossipMetrics) ImportMetrics(importFile string) (error, bool) { - // Check if file exist - if FileExists(importFile) { // if exists, read it +func (c *GossipMetrics) ImportMetrics(importFolder string) (error, bool) { + // Check if there is any checkpoint file in the folder + cpFile := importFolder + "/metrics/checkpoint-folder.json" + if FileExists(cpFile) { // if exists, read it // get the json of the file - jsonFile, err := os.Open(importFile) + fmt.Println("Importing Checkpoint Json:", cpFile) + cp, err := os.Open(cpFile) if err != nil { return err, true } - byteValue, err := ioutil.ReadAll(jsonFile) + cpBytes, err := ioutil.ReadAll(cp) if err != nil { return err, true } - tempMap := make(map[peer.ID]PeerMetrics, 0) - json.Unmarshal(byteValue, &tempMap) - // iterate to add the metrics from the json to the the GossipMetrics - for k, v := range tempMap { - c.GossipMetrics.Store(k, v) + cpFolder := Checkpoint{} + json.Unmarshal(cpBytes, &cpFolder) + importFile := importFolder + "/metrics/" + cpFolder.Checkpoint + "/gossip-metrics.json" + // Check if file exist + if FileExists(importFile) { // if exists, read it + // get the json of the file + fmt.Println("Importing Gossip-Metrics Json:", importFile) + jsonFile, err := os.Open(importFile) + if err != nil { + return err, true + } + byteValue, err := ioutil.ReadAll(jsonFile) + if err != nil { + return err, true + } + tempMap := make(map[peer.ID]utils.PeerMetrics) + json.Unmarshal(byteValue, &tempMap) + // iterate to add the metrics from the json to the the GossipMetrics + for k, v := range tempMap { + c.GossipMetrics.Store(k, v) + } + return nil, true + } else { + return nil, false } - return nil, true } else { + fmt.Println("NO previous Checkpoint") return nil, false } } @@ -80,78 +105,11 @@ type GossipState struct { SeenFilter bool } -// Base Struct for the topic name and the received messages on the different topics -type PeerMetrics struct { - PeerId peer.ID - NodeId string - ClientType string - Pubkey string - Addrs string - Ip string - Country string - City string - Latency float64 - - ConnectionEvents []ConnectionEvents - // Counters for the different topics - BeaconBlock MessageMetrics - BeaconAggregateProof MessageMetrics - VoluntaryExit MessageMetrics - ProposerSlashing MessageMetrics - AttesterSlashing MessageMetrics - // Variables related to the SubNets (only needed for when Shards will be implemented) -} - -func NewPeerMetrics(peerId peer.ID) PeerMetrics { - pm := PeerMetrics{ - PeerId: peerId, - NodeId: "", - ClientType: "Unknown", - Pubkey: "", - Addrs: "/ip4/127.0.0.1/0000", - Ip: "127.0.0.1", - Country: "Unknown", - City: "Unknown", - Latency: 0, - - ConnectionEvents: make([]ConnectionEvents, 1), - // Counters for the different topics - BeaconBlock: NewMessageMetrics(), - BeaconAggregateProof: NewMessageMetrics(), - VoluntaryExit: NewMessageMetrics(), - ProposerSlashing: NewMessageMetrics(), - AttesterSlashing: NewMessageMetrics(), - } - return pm -} - -func NewMessageMetrics() MessageMetrics { - mm := MessageMetrics{ - Cnt: 0, - FirstMessageTime: 0, - LastMessageTime: 0, - } - return mm -} - -// Connection event model -type ConnectionEvents struct { - ConnectionType string - TimeMili int64 -} - -// Information regarding the messages received on the beacon_lock topic -type MessageMetrics struct { - Cnt int64 - FirstMessageTime int64 - LastMessageTime int64 -} - // Function that Wraps/Marshals the content of the sync.Map to be exported as a json func (c *GossipMetrics) MarshalMetrics() ([]byte, error) { - tmpMap := make(map[string]PeerMetrics) + tmpMap := make(map[string]utils.PeerMetrics) c.GossipMetrics.Range(func(k, v interface{}) bool { - tmpMap[k.(peer.ID).String()] = v.(PeerMetrics) + tmpMap[k.(peer.ID).String()] = v.(utils.PeerMetrics) return true }) return json.Marshal(tmpMap) @@ -159,8 +117,7 @@ func (c *GossipMetrics) MarshalMetrics() ([]byte, error) { // Function that Wraps/Marshals the content of the Entire Peerstore into a json func (c *GossipMetrics) MarshalPeerStore(ep track.ExtendedPeerstore) ([]byte, error) { - var peers []peer.ID - peers = ep.Peers() + peers := ep.Peers() peerData := make(map[string]*track.PeerAllData) for _, p := range peers { peerData[p.String()] = ep.GetAllData(p) @@ -187,6 +144,22 @@ func GetFullAddress(multiAddrs []string) string { return address } +// Function that resets to 0 the connections/disconnections, and message counters +// this way the Ram Usage gets limited (up to ~10k nodes for a 12h-24h ) +// NOTE: Keep in mind that the peers that we ended up connected to, will experience a weid connection time +// TODO: Fix peers that stayed connected to the tool +func (c *GossipMetrics) ResetDynamicMetrics() { + fmt.Println("Reseting Dynamic Metrics in Peer") + // Iterate throught the peers in the metrics, restarting connection events and messages + c.GossipMetrics.Range(func(key interface{}, value interface{}) bool { + p := value.(utils.PeerMetrics) + p.ResetDynamicMetrics() + c.GossipMetrics.Store(key, p) + return true + }) + fmt.Println("Finished Reseting Dynamic Metrics") +} + // Function that iterates through the received peers and fills the missing information func (c *GossipMetrics) FillMetrics(ep track.ExtendedPeerstore) { // to prevent the Filler from crashing (the url-service only accepts 45req/s) @@ -196,7 +169,7 @@ func (c *GossipMetrics) FillMetrics(ep track.ExtendedPeerstore) { // Read the info that we have from him p, ok := c.GossipMetrics.Load(key) if ok { - peerMetrics := p.(PeerMetrics) + peerMetrics := p.(utils.PeerMetrics) peerData := ep.GetAllData(peerMetrics.PeerId) //fmt.Println("Filling Metrics of Peer:", peerMetrics.PeerId.String()) if len(peerMetrics.NodeId) == 0 { @@ -214,7 +187,7 @@ func (c *GossipMetrics) FillMetrics(ep track.ExtendedPeerstore) { peerMetrics.Pubkey = peerData.Pubkey } - if len(peerMetrics.Addrs) == 0 || peerMetrics.Addrs == "/ip4/127.0.0.1/0000" { + if len(peerMetrics.Addrs) == 0 || peerMetrics.Addrs == "/ip4/127.0.0.1/0000" || peerMetrics.Addrs == "/ip4/127.0.0.1/9000" { address := GetFullAddress(peerData.Addrs) //fmt.Println("Addrs empty", peerMetrics.Addrs, "Adding Addrs:", address) peerMetrics.Addrs = address @@ -256,7 +229,7 @@ func (c *GossipMetrics) FillMetrics(ep track.ExtendedPeerstore) { } // Function that Exports the entire Metrics to a .json file (lets see if in the future we can add websockets or other implementations) -func (c *GossipMetrics) ExportMetrics(filePath string, peerstorePath string, csvPath string, extraMetricsPath string, ep track.ExtendedPeerstore) error { +func (c *GossipMetrics) ExportMetrics(filePath string, peerstorePath string, csvPath string, ep track.ExtendedPeerstore) error { metrics, err := c.MarshalMetrics() if err != nil { fmt.Println("Error Marshalling the metrics") @@ -278,16 +251,10 @@ func (c *GossipMetrics) ExportMetrics(filePath string, peerstorePath string, csv } // Generate the MetricsDataFrame of the Current Metrics // Export the metrics to the given CSV file - mdf := NewMetricsDataFrame(c.GossipMetrics) + mdf := export.NewMetricsDataFrame(c.GossipMetrics) err = mdf.ExportToCSV(csvPath) if err != nil { - fmt.Printf("Error:", err) - return err - } - // Export the extra metrics to a csv - err = c.ExtraMetrics.ExportCSV(extraMetricsPath) - if err != nil { - fmt.Printf("Error exporting the Extra metrics:", err) + fmt.Println("Error:", err) return err } return nil @@ -376,27 +343,33 @@ func getIpAndLocationFromAddrs(multiAddrs string) (ip string, country string, ci // Add new peer with all the information from the peerstore to the metrics db // returns: Alredy (Bool) -func (c *GossipMetrics) AddNewPeer(peerId peer.ID) { +func (c *GossipMetrics) AddNewPeer(peerId peer.ID) bool { _, ok := c.GossipMetrics.Load(peerId) if !ok { // We will just add the info that we have (the peerId) - peerMetrics := NewPeerMetrics(peerId) + peerMetrics := utils.NewPeerMetrics(peerId) // Include it to the Peer DB c.GossipMetrics.Store(peerId, peerMetrics) // return that wasn't already on the peerstore + return false } + return true } // Add a connection Event to the given peer func (c *GossipMetrics) AddConnectionEvent(peerId peer.ID, connectionType string) { - newConnection := ConnectionEvents{ + newConnection := utils.ConnectionEvents{ ConnectionType: connectionType, - TimeMili: GetTimeMiliseconds(), + TimeMili: utils.GetTimeMiliseconds(), } + pMetrics, ok := c.GossipMetrics.Load(peerId) if ok { - peerMetrics := pMetrics.(PeerMetrics) + peerMetrics := pMetrics.(utils.PeerMetrics) peerMetrics.ConnectionEvents = append(peerMetrics.ConnectionEvents, newConnection) + if connectionType == "Connection" { + peerMetrics.Connected = true + } c.GossipMetrics.Store(peerId, peerMetrics) } else { // Might be possible to add @@ -404,40 +377,27 @@ func (c *GossipMetrics) AddConnectionEvent(peerId peer.ID, connectionType string } } -// Increments the counter of the topic -func (c *MessageMetrics) IncrementCnt() int64 { - c.Cnt++ - return c.Cnt -} - -// Stamps linux_time(millis) on the FirstMessageTime/LastMessageTime from given args: time (int64), flag string("first"/"last") -func (c *MessageMetrics) StampTime(flag string) { - unixMillis := GetTimeMiliseconds() - - switch flag { - case "first": - c.FirstMessageTime = unixMillis - case "last": - c.LastMessageTime = unixMillis - default: - fmt.Println("Metrics Package -> StampTime.flag wrongly parsed") +// Add a connection Event to the given peer +func (c *GossipMetrics) AddMetadataEvent(peerId peer.ID, success bool) { + pMetrics, ok := c.GossipMetrics.Load(peerId) + if ok { + peerMetrics := pMetrics.(utils.PeerMetrics) + peerMetrics.MetadataRequest = true + if success { + peerMetrics.MetadataSucceed = true + } + c.GossipMetrics.Store(peerId, peerMetrics) + } else { + // Might be possible to add + fmt.Println("Counld't add Event, Peer is not in the list") } } -func GetTimeMiliseconds() int64 { - now := time.Now() - //secs := now.Unix() - nanos := now.UnixNano() - millis := nanos / 1000000 - - return millis -} - // Function that Manages the metrics updates for the incoming messages func (c *GossipMetrics) IncomingMessageManager(peerId peer.ID, topicName string) error { pMetrics, _ := c.GossipMetrics.Load(peerId) //fmt.Println("the loaded", pMetrics) - peerMetrics := pMetrics.(PeerMetrics) + peerMetrics := pMetrics.(utils.PeerMetrics) messageMetrics, err := GetMessageMetrics(&peerMetrics, topicName) if err != nil { return errors.New("Topic Name no supported") @@ -455,7 +415,7 @@ func (c *GossipMetrics) IncomingMessageManager(peerId peer.ID, topicName string) return nil } -func GetMessageMetrics(c *PeerMetrics, topicName string) (mesMetr *MessageMetrics, err error) { +func GetMessageMetrics(c *utils.PeerMetrics, topicName string) (mesMetr *utils.MessageMetrics, err error) { // All this could be inside a different function switch topicName { case pgossip.BeaconBlock: diff --git a/src/metrics/ping-metadata.go b/src/metrics/ping-metadata.go new file mode 100644 index 0000000..3d696c9 --- /dev/null +++ b/src/metrics/ping-metadata.go @@ -0,0 +1,40 @@ +package metrics + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/protolambda/rumor/control/actor/base" + "github.com/protolambda/rumor/control/actor/flags" + "github.com/protolambda/rumor/control/actor/peer/metadata" + "github.com/protolambda/rumor/p2p/rpc/reqresp" + "github.com/protolambda/rumor/p2p/track" +) + +var timeout time.Duration = 5 * time.Second + +func PollPeerMetadata(p peer.ID, base *base.Base, peerMetadataState *metadata.PeerMetadataState, store track.ExtendedPeerstore, gm *GossipMetrics) { + // apply timeout to each poll target in this round + reqCtx, _ := context.WithTimeout(context.Background(), timeout) + + go func(peerID peer.ID) { + pingCmd := &metadata.PeerMetadataPingCmd{ + Base: base, + PeerMetadataState: peerMetadataState, + Store: store, + Timeout: timeout, + Compression: flags.CompressionFlag{Compression: reqresp.SnappyCompression{}}, + Update: true, + ForceUpdate: true, + UpdateTimeout: timeout, + MaxTries: uint64(2), + PeerID: flags.PeerIDFlag{PeerID: peerID}, + } + if err := pingCmd.Run(reqCtx); err != nil { + gm.AddMetadataEvent(p, false) + } else { + gm.AddMetadataEvent(p, true) + } + }(p) +} diff --git a/src/metrics/utils/connectionevents.go b/src/metrics/utils/connectionevents.go new file mode 100644 index 0000000..c11bb85 --- /dev/null +++ b/src/metrics/utils/connectionevents.go @@ -0,0 +1,20 @@ +package utils + +import ( + "time" +) + +// Connection event model +type ConnectionEvents struct { + ConnectionType string + TimeMili int64 +} + +func GetTimeMiliseconds() int64 { + now := time.Now() + //secs := now.Unix() + nanos := now.UnixNano() + millis := nanos / 1000000 + + return millis +} diff --git a/src/metrics/utils/messagemetrics.go b/src/metrics/utils/messagemetrics.go new file mode 100644 index 0000000..e58998b --- /dev/null +++ b/src/metrics/utils/messagemetrics.go @@ -0,0 +1,39 @@ +package utils + +import "fmt" + +// Information regarding the messages received on the beacon_lock topic +type MessageMetrics struct { + Cnt int64 + FirstMessageTime int64 + LastMessageTime int64 +} + +func NewMessageMetrics() MessageMetrics { + mm := MessageMetrics{ + Cnt: 0, + FirstMessageTime: 0, + LastMessageTime: 0, + } + return mm +} + +// Increments the counter of the topic +func (c *MessageMetrics) IncrementCnt() int64 { + c.Cnt++ + return c.Cnt +} + +// Stamps linux_time(millis) on the FirstMessageTime/LastMessageTime from given args: time (int64), flag string("first"/"last") +func (c *MessageMetrics) StampTime(flag string) { + unixMillis := GetTimeMiliseconds() + + switch flag { + case "first": + c.FirstMessageTime = unixMillis + case "last": + c.LastMessageTime = unixMillis + default: + fmt.Println("Metrics Package -> StampTime.flag wrongly parsed") + } +} diff --git a/src/metrics/utils/peermetrics.go b/src/metrics/utils/peermetrics.go new file mode 100644 index 0000000..80ffa46 --- /dev/null +++ b/src/metrics/utils/peermetrics.go @@ -0,0 +1,78 @@ +package utils + +import ( + "github.com/libp2p/go-libp2p-core/peer" +) + +// Base Struct for the topic name and the received messages on the different topics +type PeerMetrics struct { + PeerId peer.ID + NodeId string + ClientType string + Pubkey string + Addrs string + Ip string + Country string + City string + Latency float64 + + Attempted bool // If the peer has been attempted to stablish a connection + Succeed bool // If the connection attempt has been successful + Connected bool // If the peer was at any point connected by the crawler (keep count of incoming dials) + Attempts int // Number of attempts done + Error string // Type of error that we detected + + MetadataRequest bool // If the peer has been attempted to request its metadata + MetadataSucceed bool // If the peer has been successfully requested its metadata + + ConnectionEvents []ConnectionEvents + // Counters for the different topics + BeaconBlock MessageMetrics + BeaconAggregateProof MessageMetrics + VoluntaryExit MessageMetrics + ProposerSlashing MessageMetrics + AttesterSlashing MessageMetrics + // Variables related to the SubNets (only needed for when Shards will be implemented) +} + +func NewPeerMetrics(peerId peer.ID) PeerMetrics { + pm := PeerMetrics{ + PeerId: peerId, + NodeId: "", + ClientType: "Unknown", + Pubkey: "", + Addrs: "/ip4/127.0.0.1/0000", + Ip: "127.0.0.1", + Country: "Unknown", + City: "Unknown", + Latency: 0, + + Attempted: false, + Succeed: false, + Connected: false, + Attempts: 0, + Error: "None", + + MetadataRequest: false, + MetadataSucceed: false, + + ConnectionEvents: make([]ConnectionEvents, 0), + // Counters for the different topics + BeaconBlock: NewMessageMetrics(), + BeaconAggregateProof: NewMessageMetrics(), + VoluntaryExit: NewMessageMetrics(), + ProposerSlashing: NewMessageMetrics(), + AttesterSlashing: NewMessageMetrics(), + } + return pm +} + +func (pm *PeerMetrics) ResetDynamicMetrics() { + pm.Attempts = 0 + pm.ConnectionEvents = make([]ConnectionEvents, 0) + pm.BeaconBlock = NewMessageMetrics() + pm.BeaconAggregateProof = NewMessageMetrics() + pm.VoluntaryExit = NewMessageMetrics() + pm.ProposerSlashing = NewMessageMetrics() + pm.AttesterSlashing = NewMessageMetrics() +} diff --git a/src/sh/file.go b/src/sh/file.go index ae2010b..2620a34 100644 --- a/src/sh/file.go +++ b/src/sh/file.go @@ -2,13 +2,14 @@ package sh import ( "context" + "os" + "time" + "github.com/protolambda/rumor/control" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "mvdan.cc/sh/v3/interp" "mvdan.cc/sh/v3/syntax" - "os" - "time" ) func FileCmd() *cobra.Command {