Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
Merge pull request #193 from cybercongress/172_btc_mempool_dump
Browse files Browse the repository at this point in the history
Setup bitcoin mempool transaction dump
  • Loading branch information
hleb-albau committed May 28, 2018
2 parents 237c065 + b3a5a6d commit c56f604
Show file tree
Hide file tree
Showing 30 changed files with 1,432 additions and 653 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ buildscript {
// tests
junitVersion = "5.2.0"
junitPlatformVersion = "1.2.0"
mockitoVersion = "2.1.0"
mockitoVersion = "2.8.9"
mockitoKotlinVersion = "1.5.0"
assertjVersion = "3.9.0"

Expand Down Expand Up @@ -172,6 +172,7 @@ subprojects {
exclude 'org.jetbrains.kotlin:kotlin-stdlib'
}
dependency("io.projectreactor:reactor-core:$reactorVersion")
dependency("io.projectreactor:reactor-test:$reactorVersion")

dependency("io.micrometer:micrometer-core:$micrometerVersion")
dependency("io.micrometer:micrometer-registry-prometheus:$micrometerVersion")
Expand All @@ -184,6 +185,7 @@ subprojects {
testCompile("org.mockito:mockito-core")
testCompile("com.nhaarman:mockito-kotlin")
testCompile("org.assertj:assertj-core")
testCompile("io.projectreactor:reactor-test")
testRuntime("org.junit.jupiter:junit-jupiter-engine")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.datastax.driver.core.Cluster
import com.datastax.driver.extras.codecs.jdk8.InstantCodec
import fund.cyber.cassandra.bitcoin.repository.BitcoinContractSummaryRepository
import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockRepository
import fund.cyber.cassandra.bitcoin.repository.BitcoinContractTxRepository
import fund.cyber.cassandra.bitcoin.repository.BitcoinTxRepository
import fund.cyber.cassandra.bitcoin.repository.PageableBitcoinContractMinedBlockRepository
import fund.cyber.cassandra.bitcoin.repository.PageableBitcoinContractTxRepository
Expand Down Expand Up @@ -162,7 +163,9 @@ class BitcoinRepositoriesConfiguration : InitializingBean {

val contractRepository = reactiveRepositoryFactory
.getRepository(BitcoinContractSummaryRepository::class.java)
val contractTxRepository = repositoryFactory
val contractTxRepository = reactiveRepositoryFactory
.getRepository(BitcoinContractTxRepository::class.java)
val pageableContractTxRepository = repositoryFactory
.getRepository(PageableBitcoinContractTxRepository::class.java)
val contractBlockRepository = repositoryFactory
.getRepository(PageableBitcoinContractMinedBlockRepository::class.java)
Expand All @@ -176,8 +179,10 @@ class BitcoinRepositoriesConfiguration : InitializingBean {
beanFactory.registerSingleton("${repositoryPrefix}txRepository", txRepository)

beanFactory.registerSingleton("${repositoryPrefix}contractRepository", contractRepository)
beanFactory.registerSingleton("${repositoryPrefix}contractTxRepository",
contractTxRepository)
beanFactory.registerSingleton("${repositoryPrefix}pageableContractTxRepository",
contractTxRepository)
pageableContractTxRepository)
beanFactory.registerSingleton("${repositoryPrefix}pageableContractBlockRepository",
contractBlockRepository)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,28 @@ data class CqlBitcoinBlockTxPreview(

@Table("block")
data class CqlBitcoinBlock(
@PrimaryKey val number: Long,
val hash: String,
@Column("miner_contract_hash") val minerContractHash: String,
@Column("block_reward") val blockReward: BigDecimal,
@Column("tx_fees") val txFees: BigDecimal,
@Column("coinbase_data") val coinbaseData: String,
val timestamp: Instant,
val nonce: Long,
val merkleroot: String,
val size: Int,
val version: Int,
val weight: Int,
val bits: String,
val difficulty: BigInteger,
@Column("tx_number") val txNumber: Int,
@Column("total_outputs_value") val totalOutputsValue: String
@PrimaryKey val number: Long,
val hash: String,
@Column("parent_hash") val parentHash: String,
@Column("miner_contract_hash") val minerContractHash: String,
@Column("block_reward") val blockReward: BigDecimal,
@Column("tx_fees") val txFees: BigDecimal,
@Column("coinbase_data") val coinbaseData: String,
val timestamp: Instant,
val nonce: Long,
val merkleroot: String,
val size: Int,
val version: Int,
val weight: Int,
val bits: String,
val difficulty: BigInteger,
@Column("tx_number") val txNumber: Int,
@Column("total_outputs_value") val totalOutputsValue: String
) : CqlBitcoinItem {

constructor(block: BitcoinBlock) : this(
number = block.height, hash = block.hash, minerContractHash = block.minerContractHash,
number = block.height, hash = block.hash, parentHash = block.parentHash,
minerContractHash = block.minerContractHash,
blockReward = block.blockReward, txFees = block.txFees, coinbaseData = block.coinbaseData,
timestamp = block.time, nonce = block.nonce, bits = block.bits, merkleroot = block.merkleroot,
size = block.size, version = block.version, weight = block.weight, difficulty = block.difficulty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package fund.cyber.cassandra.bitcoin.model
import fund.cyber.search.model.bitcoin.BitcoinTx
import fund.cyber.search.model.bitcoin.BitcoinTxIn
import fund.cyber.search.model.bitcoin.BitcoinTxOut
import fund.cyber.search.model.bitcoin.SignatureScript
import org.springframework.data.cassandra.core.mapping.Column
import org.springframework.data.cassandra.core.mapping.PrimaryKey
import org.springframework.data.cassandra.core.mapping.Table
Expand All @@ -19,6 +20,7 @@ data class CqlBitcoinTx(
@Column("block_number") val blockNumber: Long,
@Column("block_hash") val blockHash: String?,
val coinbase: String? = null,
@Column("first_seen_time") val firstSeenTime: Instant,
@Column("block_time") val blockTime: Instant?,
val size: Int,
val fee: String,
Expand All @@ -30,7 +32,7 @@ data class CqlBitcoinTx(

constructor(tx: BitcoinTx) : this(
hash = tx.hash, blockNumber = tx.blockNumber, blockHash = tx.blockHash, coinbase = tx.coinbase,
blockTime = tx.blockTime, size = tx.size, fee = tx.fee.toString(),
blockTime = tx.blockTime, size = tx.size, fee = tx.fee.toString(), firstSeenTime = tx.firstSeenTime,
totalInput = tx.totalInputsAmount.toString(), totalOutput = tx.totalOutputsAmount.toString(),
ins = tx.ins.map { txIn -> CqlBitcoinTxIn(txIn) }, outs = tx.outs.map { txOut -> CqlBitcoinTxOut(txOut) }
)
Expand All @@ -45,14 +47,15 @@ data class CqlBitcoinTx(
data class CqlBitcoinTxIn(
val contracts: List<String>,
val amount: BigDecimal,
val asm: String,
val scriptSig: CqlBitcoinSignatureScript,
val txinwitness: List<String>,
@Column("tx_hash") val txHash: String,
@Column("tx_out") val txOut: Int
) {

constructor(txIn: BitcoinTxIn) : this(
contracts = txIn.contracts, amount = txIn.amount, asm = txIn.asm,
txHash = txIn.txHash, txOut = txIn.txOut
contracts = txIn.contracts, amount = txIn.amount, scriptSig = CqlBitcoinSignatureScript(txIn.scriptSig),
txHash = txIn.txHash, txOut = txIn.txOut, txinwitness = txIn.txinwitness
)
}

Expand All @@ -70,3 +73,12 @@ data class CqlBitcoinTxOut(
out = txOut.out, requiredSignatures = txOut.requiredSignatures
)
}

@UserDefinedType("script_sig")
data class CqlBitcoinSignatureScript(
val asm: String,
val hex: String
) {

constructor(scriptSig: SignatureScript) : this (asm = scriptSig.asm, hex = scriptSig.hex)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.springframework.data.cassandra.repository.CassandraRepository
import org.springframework.data.domain.Pageable
import org.springframework.data.domain.Slice
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import reactor.core.publisher.Flux


interface BitcoinContractSummaryRepository : ReactiveCrudRepository<CqlBitcoinContractSummary, String>
Expand All @@ -18,7 +19,9 @@ interface PageableBitcoinContractMinedBlockRepository: CassandraRepository<CqlBi
fun findAllByMinerContractHash(minerContractHash: String, page: Pageable): Slice<CqlBitcoinContractMinedBlock>
}

interface BitcoinContractTxRepository : ReactiveCrudRepository<CqlBitcoinContractTxPreview, MapId>
interface BitcoinContractTxRepository : ReactiveCrudRepository<CqlBitcoinContractTxPreview, MapId> {
fun findAllByContractHashAndBlockTime(contractHash: String, blockTime: Long): Flux<CqlBitcoinContractTxPreview>
}

interface PageableBitcoinContractTxRepository : CassandraRepository<CqlBitcoinContractTxPreview, MapId> {
fun findAllByContractHash(contractHash: String, page: Pageable): Slice<CqlBitcoinContractTxPreview>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ CREATE TABLE IF NOT EXISTS bitcoin.tx_preview_by_block (
PRIMARY KEY ( block_number, "index" )
) WITH CLUSTERING ORDER BY ( "index" ASC );


CREATE TYPE IF NOT EXISTS bitcoin.script_sig (
asm text,
hex text
);

CREATE TYPE IF NOT EXISTS bitcoin.tx_out (
contracts FROZEN < list < text > >,
Expand All @@ -38,7 +41,8 @@ CREATE TYPE IF NOT EXISTS bitcoin.tx_out (
CREATE TYPE IF NOT EXISTS bitcoin.tx_in (
contracts FROZEN < list < text > >,
amount decimal,
asm text,
scriptSig FROZEN < bitcoin.script_sig >,
txinwitness FROZEN < list < text > >,
tx_hash text,
tx_out int
);
Expand All @@ -48,6 +52,7 @@ CREATE TABLE IF NOT EXISTS bitcoin.tx (
block_number bigint,
block_hash text,
block_time timestamp,
first_seen_time timestamp,
size int,
coinbase text,
fee text,
Expand All @@ -61,6 +66,7 @@ CREATE TABLE IF NOT EXISTS bitcoin.tx (

CREATE TABLE IF NOT EXISTS bitcoin.block (
hash text,
parent_hash text,
number bigint PRIMARY KEY,
miner_contract_hash text,
block_reward decimal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ CREATE TABLE IF NOT EXISTS bitcoin_cash.tx_preview_by_block (
) WITH CLUSTERING ORDER BY ( "index" ASC );


CREATE TYPE IF NOT EXISTS bitcoin.script_sig (
asm text,
hex text
);

CREATE TYPE IF NOT EXISTS bitcoin_cash.tx_out (
contracts FROZEN < list < text > >,
Expand All @@ -38,7 +42,8 @@ CREATE TYPE IF NOT EXISTS bitcoin_cash.tx_out (
CREATE TYPE IF NOT EXISTS bitcoin_cash.tx_in (
contracts FROZEN < list < text > >,
amount decimal,
asm text,
scriptSig FROZEN < bitcoin_cash.script_sig >,
txinwitness FROZEN < list < text > >,
tx_hash text,
tx_out int
);
Expand All @@ -48,6 +53,7 @@ CREATE TABLE IF NOT EXISTS bitcoin_cash.tx (
block_number bigint,
block_hash text,
block_time timestamp,
first_seen_time timestamp,
size int,
coinbase text,
fee text,
Expand All @@ -61,6 +67,7 @@ CREATE TABLE IF NOT EXISTS bitcoin_cash.tx (

CREATE TABLE IF NOT EXISTS bitcoin_cash.block (
hash text,
parent_hash text,
number bigint PRIMARY KEY,
miner_contract_hash text,
block_reward decimal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.time.Instant
data class BitcoinBlock(
val height: Long,
val hash: String,
val parentHash: String,
val minerContractHash: String,
val blockReward: BigDecimal,
val txFees: BigDecimal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ data class BitcoinTx(

fun allContractsUsedInTransaction() = ins.flatMap { input -> input.contracts } +
outs.flatMap { output -> output.contracts }

fun mempoolState() = this.copy(blockNumber = -1, blockHash = null, index = -1, blockTime = null)
}

data class BitcoinTxIn(
val contracts: List<String>,
val amount: BigDecimal,
val asm: String,
val scriptSig: SignatureScript,
val txinwitness: List<String> = emptyList(),
val txHash: String,
val txOut: Int
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import fund.cyber.cassandra.bitcoin.model.CqlBitcoinContractMinedBlock
import fund.cyber.cassandra.bitcoin.model.CqlBitcoinBlock
import fund.cyber.cassandra.bitcoin.repository.BitcoinContractMinedBlockRepository
import fund.cyber.cassandra.bitcoin.repository.BitcoinBlockRepository
import fund.cyber.dump.common.filterNotContainsAllEventsOf
import fund.cyber.dump.common.toRecordEventsMap
import fund.cyber.dump.common.execute
import fund.cyber.dump.common.toFluxBatch
import fund.cyber.search.model.bitcoin.BitcoinBlock
import fund.cyber.search.model.chains.BitcoinFamilyChain
import fund.cyber.search.model.events.PumpEvent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.reactivestreams.Publisher
import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.BatchMessageListener
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono


class BlockDumpProcess(
Expand All @@ -24,28 +27,28 @@ class BlockDumpProcess(

override fun onMessage(records: List<ConsumerRecord<PumpEvent, BitcoinBlock>>) {

val first = records.first()
val last = records.last()
log.info("Dumping batch of ${first.value().height}-${last.value().height} $chain blocks")

val recordsToProcess = records.toRecordEventsMap()
.filterNotContainsAllEventsOf(listOf(PumpEvent.NEW_BLOCK, PumpEvent.DROPPED_BLOCK))

val blocksToCommit = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.NEW_BLOCK) }.keys
val blocksToRevert = recordsToProcess.filter { entry -> entry.value.contains(PumpEvent.DROPPED_BLOCK) }.keys

blockRepository
.deleteAll(blocksToRevert.map { block -> CqlBitcoinBlock(block) })
.block()
blockRepository
.saveAll(blocksToCommit.map { block -> CqlBitcoinBlock(block) })
.collectList().block()

contractMinedBlockRepository
.deleteAll(blocksToRevert.map { block -> CqlBitcoinContractMinedBlock(block) })
.block()
contractMinedBlockRepository
.saveAll(blocksToCommit.map { block -> CqlBitcoinContractMinedBlock(block) })
.collectList().block()
log.info("Dumping batch of ${records.size} $chain blocks from offset ${records.first().offset()}")

records.toFluxBatch { event, block ->
return@toFluxBatch when (event) {
PumpEvent.NEW_BLOCK -> block.toNewBlockPublisher()
PumpEvent.NEW_POOL_TX -> Mono.empty()
PumpEvent.DROPPED_BLOCK -> block.toDropBlockPublisher()
}
}.execute()
}

private fun BitcoinBlock.toNewBlockPublisher(): Publisher<Any> {
val saveBlockMono = blockRepository.save(CqlBitcoinBlock(this))
val saveContractBlockMono = contractMinedBlockRepository.save(CqlBitcoinContractMinedBlock(this))

return Flux.concat(saveBlockMono, saveContractBlockMono)
}

private fun BitcoinBlock.toDropBlockPublisher(): Publisher<Any> {
val deleteBlockMono = blockRepository.delete(CqlBitcoinBlock(this))
val deleteContractBlockMono = contractMinedBlockRepository.delete(CqlBitcoinContractMinedBlock(this))

return Flux.concat(deleteBlockMono, deleteContractBlockMono)
}
}
Loading

0 comments on commit c56f604

Please sign in to comment.