Leveraging big data for bitcoin price prediction.

Big-Data-Verteda-1200x848This is definitely one of my most favorite projects so far. It was inspired by my academic involvement with distributed systems. The dataset here is a one recording bitcoin transactions spanning over the years. With columns such as wallet ID, Amount and so.

I will try to upload the code to GitHub as well. Do email me @ surendersampath@outlook.com if you need more clarification and help.

1. TIME ANALYSIS ON BITCOIN DATA

Create a bar plot showing the number of transactions which occurred every month between the start and end of the dataset. What do you notice about the overall trend in the utilization of bitcoin?

Please find the bar plot for the number of transactions occurred every month.

On analyzing the overall trend, the following observations are made.

    1. It has been observed from a number of transactions that bitcoin utilization has grown positively over the period. However, this growth cannot be considered purely linear.
    2. The highest number of transactions were recorded during the month of September 2014 before witnessing a significant fall in the number during December 2014.

//task1.py rom mrjob.job import MRJob import re import time #This line declares the class CW-Part A, which extends the MRJob format. class CW1(MRJob): # this class will define two additional methods: the mapper method goes here def mapper(self, _, line): fields = line.split(“,”) try: if (len(fields)==5): time_epoch = int(fields[2]) month = time.strftime(“%b”,time.gmtime(time_epoch)) #returns month name year = time.strftime(“%Y”,time.gmtime(time_epoch)) #returns Year yield (month+”-“+year, 1) #Concatenates month & year except: pass #no need to do anything, just ignore the line, as it was malformed or not in required format def combiner(self, key, values): #Receives a value of 1 for all individual date fields s = sum(values) #For every unique key sum is calculated & aggregated. yield (key,s) def reducer(self, key, values): #Same as above s = sum(values) yield (key,s) #this part of the python script tells to actually run the defined MapReduce job. Note that CW1 is the name of the class if __name__ == ‘__main__’: CW1.run()
Working :

The mapper function takes time column(field[2]) and converts the time from epoch format to a standard one. The month and year are concatenated as string forms the key which was separated in the previous step. The reducer then sums up the corresponding values as per the keys.

2. TOP TEN DONORS

The next objective is to obtain the top 10 donors over the whole dataset for the Wikileaks bitcoin
address: {1HB5XMLmzFVj8ALj6mfBsbifRoD4miY36v}. Is there any information on who these wallets belong to? Can you work out how much was being donated if converted into pounds?

It is impossible to find out or trace back the owner of the wallet by its wallet address. It is indeed pseudo-anonymous and companies or exchanges like coinbase can only keep track of their registered users only along with its name which is again not possible to get unless it involves government interference.

Please find the top ten donors and equivalent value in British pounds. The output is derived from the RDD ‘inpounds’.


//task2.py

import pyspark
import re
#COURSEWORK - PART B
sc = pyspark.SparkContext()
#Creating RDD for vout and vin.csv
vout = sc.textFile("/data/bitcoin/vout.csv/").map(lambda s:s.split(','))
vin = sc.textFile("/data/bitcoin/vin.csv/").map(lambda s:s.split(','))
#Filtering the wikiLeaks-Receiver -  (Hash,value,n,PublicKey)
vout_temp = vout.filter(lambda line: line[3] ==
"{1HB5XMLmzFVj8ALj6mfBsbifRoD4miY36v}").map(lambda x: (x[0],(x[1],x[2],x[3]))).collectAsMap()
#To Broadcast variables to be used & save a copy of data across all nodes.
vout_dict = sc.broadcast(vout_temp)
#Emitting only all the fields separately here - txid, (tx_hash, v_out)
vinmap = vin.map(lambda x: (x[0],(x[1],x[2])))
#Performing a replication join here by comparing transactions hashes in vout in dict to txid
with each v_in. Matching rows are yielded.
join = vinmap.filter(lambda n: n[0] in vout_dict.value)
#Emitting only the required fields((tx_hash,vout),1)) which would otherwise result in long
unwanted pairs
joinmap = join.map(lambda x: ((x[1][0],x[1][1]),1)).collectAsMap() #Re-arranging to match
dictionary format
#Broadcasting the joined RDD to enable faster operations
joinmap_broad = sc.broadcast(joinmap)
#Emitting only the required fields which would otherwise result in long unwanted pairs
voutmap = vout.map(lambda x: ((x[0],x[2]), (x[3],x[1]))) #((hash,n),(publicKey, value))
#As Vin only specifies previous transaction, the previous output is being joined original Vout
second_join = voutmap.filter(lambda n: n[0] in joinmap_broad.value)
#selcecting the required fields after second join (Publickey, value)
pairs = second_join.map(lambda x: (x[1][0],float(x[1][1])))
#Counting the bitcoins per donor
totals = pairs.reduceByKey(lambda a,b: a+b)
#Sorting the list of donors in descending order
top10 = sc.parallelize(totals.takeOrdered(10, key = lambda x: -x[1]))
#Converting the bit coin price into equivalent pound value and save it to text file
inpounds = top10.map(lambda x: (x[0], "£" + str(x[1]*3291))).saveAsTextFile("top10-pounds")
#NOTE:  1 btc = £3,291.00 - as on 28/11/2018

WORKING :

  1. The vout dataset is filtered for the wiki leaks wallet address and non-malformed lines are removed. The fields are Hash, Value, n , Pubic Key
  2. Vout as dictionary is broadcasted across all nodes. The txid, tx-hash, v_out from vin are filtered only for the corressponding values present in vout dictionary.
  3. A join is performed on Hash Value and Txn_Id of Vin and Vout and the matched records are yielded.
  4. In second join, the Vin is rejoined with the original Vout file.
  5. Thepublic_keyandValuearethenemittedfromthematchedfiletothereducerto calculate the total donation from indivudal public key.
  6. The list is sorted in descending order to obtain the top 10 donors.

3. TOP TEN DONORS

DATA EXPLORATION

The final part of the project requires you to explore the data and perform some analysis of your choosing. This part is intended to be more open-ended than the first two. These tasks may be completed in either MRJob or Spark, and you may make use of any MapReduce/Spark libraries such as Mahout, MLlib (for machine learning) and GraphX for graph analysis.

APPROACH :

I’ve decided to implement the bitcoin price prediction model using linear regression. The classification goal is to predict the prices for the bitcoin over a certain period using its historical data and compare it with actual price values that occurred. This way we can understand the volatility of the bitcoin and study if our model was able to predict that.

Creating dataframe for blocks.csv

# ------------------------------BLOCKS SCHEMA-------------------------------------------------
------------------
#To create datafreame on blocks.csv
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
blocks = sc.textFile("/data/bitcoin/blocks.csv")
parts = blocks.map(lambda l: l.split(","))
#Naming the columns and choosing fields
blocks = parts.map(lambda p: Row(height=str(p[0]), hash=str(p[1]), time=str(p[2]),
difficulty=str(p[3])))
schemaBlocks = sqlContext.createDataFrame(blocks)
schemaBlocks.registerTempTable("blocks") #TableName
#SQL command to remove the header
t1 = sqlContext.sql("SELECT * from blocks where difficulty != \"difficulty\"")

Feel of created dataframe:

Screenshot 2019-02-03 at 20.25.17

Creating dataframe for transactions.csv

# ------------------------------TRANSACTIONS SCHEMA-------------------------------------------
------------------------
trans = sc.textFile("/data/bitcoin/transactions.csv")
parts = trans.map(lambda l: l.split(","))
trans = parts.map(lambda p: Row(tx_hash=str(p[0]), block_hash=str(p[1]), time=str(p[2]),
tx_in_count=str(p[3]), tx_out_count=str(p[4])))
schemaTrans = sqlContext.createDataFrame(trans)
schemaTrans.registerTempTable("trans") #TableName
#SQL command to remove the header
t2 = sqlContext.sql("SELECT * from trans where tx_hash != \"tx_hash\"")


Screenshot 2019-02-03 at 20.26.17.png

Now, to decide on the dataset in order to apply machine learning, I’ve chosen to gather data from the following source.

https://datamarket.com/data/set/4aq9/various-bitcoin-currency- statistics#!ds=4aq9!7ajt=l&display=line

The dataset (Bitcoindata.csv) from the above source contains columns such as Date, Difficulty, Market price (USD), number of transactions which are similar to the ones we find in blocks.csv & transactions.csv.

A sample from the bitcoindata.csv is being shown below.

Screenshot 2019-02-03 at 20.27.39.png

Feel of dataframe after loading the source into HDFS.

bitdata = sc.textFile("/user/ss388/input/bitcoindata.csv")
parts = bitdata.map(lambda l: l.split(","))
bitdata = parts.map(lambda p: Row(difficulty=float(p[1]), hash_rate=float(p[2]),
price=float(p[3]), transactions_count=float(p[4])))
schemaBitdata = sqlContext.createDataFrame(bitdata)
schemaBitdata.registerTempTable("bitdata") #Tablename
t3 = sqlContext.sql("SELECT * from bitdata ")
t5 = sqlContext.sql("select * from bitdata order by price desc")
Screenshot 2019-02-03 at 20.28.28.png

By using the Pyspark ML library, we can make use of this machine algorithm to predict a certain parameter.

With regards to our dataset, to predict the price, the fields difficulty, hash rate, date, market price (USD), the number of transactions are chosen as independent features.

Hash Rate: The estimated number of Giga hashes per second the bitcoin network is performing.

Difficulty: Indicates how difficult it is to find a new block compared to the easiest it used to be. Number of Transactions: Total number of unique bitcoin transactions per day.

IMPLEMENTATION:

Summary statistics of bitcoin data

#importing pandas to make use of data analysis tools
import pandas
import pandas as pd
t3.describe().toPandas().transpose()


To create an extra column called features which is a combination of other independent variables as requirement that is fed into the model. The label shall be a dependent feature.

Screenshot 2019-02-03 at 20.30.40.png

#The process involves category indexing
from pyspark.ml.feature import VectorAssembler #to merge multiple columns into vector column
#input columns here are independent features
vectorAssembler = VectorAssembler(inputCols = ['difficulty','hash_rate','transactions_count'],
outputCol = 'features')
vhouse_bitdata = vectorAssembler.transform(t3)
vhouse_bitdata = vhouse_bitdata.select(['features', 'p
rice']) #Features & label column
vhouse_bitdata.show(10)

Now, we provide the above data to our model. Before applying the linear regression model, we need to train the model with our data.

The below code randomly splits the data and feeds 70 percent of it for training and the remaining 30 percent for prediction.

Screenshot 2019-02-03 at 20.31.46.png

Screenshot 2019-02-03 at 20.32.25

Screenshot 2019-02-03 at 20.33.11.png

 

Screenshot 2019-02-03 at 20.34.11.png

 

 

How far into the future from our subset end date (September 2013) is your model accurate?

The obtained ROC (0.571656) – Receiving an operating characteristic score of 1 would mean that predictions are accurate 100% of the time and ROC score of zero would mean that the classifier is perfectly incorrect.

The AUC can happen to be < 0.5 if we split our data into train and test divisions in a way that the patterns to be identified were basically different. This might happen when one of the class is more common in the train vs. the test set, or if the patterns in each set had systematically different intercepts that went unnoticed.

Lastly, it could also have also resulted as random one simply because the classifier is at chance level in the long run but happened to get “unlucky” with our test sample (i.e. get a few more errors than successes). But in this case, the values are being relatively close to 0.5 (how close depends on the number of data points).

How does the volatility of bitcoin prices affect the effectiveness of your model?

By closing comparing the real data with predicted values, there are at constant several intervals where predicted values almost match with the real value. But in most cases, the difference is significant enough to be indicating that there are several more independent features associated with a price that has not been taken into consideration while creating the model.

Including more features like market capitalization, minors’ revenue, USD exchange trade volume, wallet number of users, miners revenue, and % of transaction volume could have resulted in better ROC rate and more accurate prediction using the same model.

2 thoughts on “Leveraging big data for bitcoin price prediction.

Leave a comment