Skip to content

Commit

Permalink
Rewrite block/operation wait helpers (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
m-kus committed Jun 9, 2021
1 parent b3aaaad commit 5273e1a
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 118 deletions.
63 changes: 15 additions & 48 deletions src/pytezos/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from decimal import Decimal
from typing import Optional, Tuple, Union
from typing import List, Optional, Union

from pytezos.block.header import BlockHeader
from pytezos.context.mixin import ContextMixin # type: ignore
Expand Down Expand Up @@ -202,57 +202,24 @@ def wait(
num_blocks_wait: int = 5,
time_between_blocks: Optional[int] = None,
prev_hash: Optional[str] = None,
) -> Tuple[OperationGroup, ...]:
) -> List[dict]:
"""Wait for multiple injected operations to get enough confirmations
:param min_confirmations: number of block injections to wait for before returning
:param num_blocks_wait: number of blocks to wait for injection
:param time_between_blocks: override the corresponding parameter from constants
:param prev_hash: Current block hash (optional). If not set, current head is used.
"""
logger.info('Waiting for %s confirmations in %s blocks', min_confirmations, num_blocks_wait)
confirmations = {opg.opg_hash: 0 for opg in operation_groups}
for _ in range(num_blocks_wait):
logger.info('Waiting for the next block')
prev_hash = self.shell.wait_next_block(time_between_blocks=time_between_blocks, prev_hash=prev_hash)
block_operations = self.shell.blocks[prev_hash].operations.managers()

for opg in operation_groups:
if confirmations[opg.opg_hash] == 0:
res = next((item for item in block_operations if item['hash'] == opg.opg_hash), None)
if res is not None:
logger.info('Operation %s was included in block %s', opg.opg_hash, prev_hash)
confirmations[opg.opg_hash] = 1
if not OperationResult.is_applied(res):
raise RpcError.from_errors(OperationResult.errors(res)) from None
else:
confirmations[opg.opg_hash] += 1
logger.info('Got %s/%s confirmations for %s', confirmations[opg.opg_hash], min_confirmations, opg.opg_hash)

if any(value == 0 for value in confirmations.values()):
pending_operations = self.shell.mempool.pending_operations.flatten()
for opg in operation_groups:
if confirmations[opg.opg_hash] == 0:
res = next((item for item in pending_operations if item['hash'] == opg.opg_hash), None)
if res is not None:
logger.info('Operation %s is still in mempool', opg.opg_hash)
if not OperationResult.is_applied(res):
raise RpcError.from_errors(OperationResult.errors(res)) from None

for opg in operation_groups:
if confirmations[opg.opg_hash] == 0:
confirmations[opg.opg_hash] = self.shell.get_confirmations(
opg_hash=opg.opg_hash,
kind=opg.contents[0]['kind'],
branch=opg.branch,
head=prev_hash,
)
if confirmations[opg.opg_hash] == 0:
raise ValueError(f'Operation {opg.opg_hash} is not found') from None

if all(value >= min_confirmations for value in confirmations.values()):
return operation_groups

required_confirmations = min_confirmations * len(operation_groups)
gathered_confirmations = sum(confirmations.values())
raise TimeoutError(f'Operations got {gathered_confirmations}/{required_confirmations} confirmations in {num_blocks_wait} blocks')
opg_hashes = []
for opg in operation_groups:
if opg.opg_hash is None:
raise ValueError('All operations must have hash assigned')
opg_hashes.append(opg.opg_hash)

return self.shell.wait_operations(
opg_hashes=opg_hashes,
ttl=num_blocks_wait,
min_confirmations=min_confirmations,
current_block_hash=prev_hash,
time_between_blocks=time_between_blocks,
)
23 changes: 21 additions & 2 deletions src/pytezos/contract/call.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from decimal import Decimal
from pprint import pformat
from typing import Union
from typing import Optional, Union

from deprecation import deprecated # type: ignore

Expand All @@ -12,6 +12,7 @@
from pytezos.michelson.format import micheline_to_michelson
from pytezos.michelson.repl import Interpreter
from pytezos.michelson.sections.storage import StorageSection
from pytezos.operation import DEFAULT_BURN_RESERVE, DEFAULT_GAS_RESERVE
from pytezos.operation.content import format_mutez, format_tez
from pytezos.operation.group import OperationGroup

Expand Down Expand Up @@ -63,10 +64,28 @@ def as_transaction(self) -> OperationGroup:
)

@property # type: ignore
@deprecated(deprecated_in='3.0.0', removed_in='3.1.0', details='use `as_transaction()` instead')
@deprecated(deprecated_in='3.0.0', removed_in='4.0.0', details='use `as_transaction()` instead')
def operation_group(self) -> OperationGroup:
return self.as_transaction().fill()

def send(
self,
gas_reserve: int = DEFAULT_GAS_RESERVE,
burn_reserve: int = DEFAULT_BURN_RESERVE,
min_confirmations: int = 0,
ttl: Optional[int] = None,
) -> 'OperationGroup':
"""Fill, sign, and broadcast the transaction
:param gas_reserve: Add a safe reserve for dynamically calculated gas limit (default is 100).
:param burn_reserve: Add a safe reserve for dynamically calculated storage limit (default is 100).
:param min_confirmations: number of block injections to wait for before returning (default is 0, i.e. async mode)
:param ttl: Number of blocks to wait in the mempool before removal (default is 5 for public network, 60 for sandbox)
:return: OperationGroup with hash filled
"""
return self.as_transaction().send(gas_reserve=gas_reserve, burn_reserve=burn_reserve, min_confirmations=min_confirmations, ttl=ttl)

@deprecated(deprecated_in='3.2.2', removed_in='4.0.0', details='use `send()` instead')
def inject(self, _async=True, preapply=True, check_result=True, num_blocks_wait=5) -> OperationGroup:
"""Send operation to blockchain."""
return (
Expand Down
5 changes: 1 addition & 4 deletions src/pytezos/contract/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ def __call__(self, *args, **kwargs) -> ContractCall:
py_obj = None

return ContractCall(
context=self._spawn_context(
address=self.address,
script=self.script
),
context=self._spawn_context(address=self.address, script=self.script),
parameters=self.encode(py_obj),
)

Expand Down
45 changes: 11 additions & 34 deletions src/pytezos/operation/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def inject(
:param min_confirmations: number of block injections to wait for before returning
:returns: operation group with metadata (raw RPC response)
"""
self.context.reset()
self.context.reset() # reset counter

opg_hash = self.shell.injection.operation.post(
operation=self.binary_payload(),
Expand All @@ -356,39 +356,16 @@ def inject(
**self.json_payload(),
}

logger.info('Waiting for %s confirmations in %s blocks', min_confirmations, num_blocks_wait)
in_mempool = True
confirmations = 0
for _ in range(num_blocks_wait):
logger.info('Waiting for the next block')
self.shell.wait_next_block(time_between_blocks=time_between_blocks)

if in_mempool:
try:
pending_opg = self.shell.mempool.pending_operations[opg_hash]
if not OperationResult.is_applied(pending_opg):
raise RpcError.from_errors(OperationResult.errors(pending_opg))
logger.info('Operation %s is still in mempool', opg_hash)
continue
except StopIteration:
in_mempool = False

try:
res = self.shell.blocks[-1:].find_operation(opg_hash)
except StopIteration:
logger.info('Operation %s not found in lastest block', opg_hash)
continue

if check_result:
if not OperationResult.is_applied(res):
raise RpcError.from_errors(OperationResult.errors(res))

confirmations += 1
logger.info('Got %s/%s confirmations', confirmations, min_confirmations)
if confirmations == min_confirmations:
return res

raise TimeoutError(f'Operation {opg_hash} got {confirmations} confirmations in {num_blocks_wait} blocks')
operations = self.shell.wait_operations(
opg_hashes=[opg_hash], ttl=num_blocks_wait, min_confirmations=min_confirmations, time_between_blocks=time_between_blocks
)

assert len(operations) == 1
if check_result:
if not OperationResult.is_applied(operations[0]):
raise RpcError.from_errors(OperationResult.errors(operations[0]))

return operations[0]

@deprecated(deprecated_in='3.1.0', removed_in='4.0.0', details='use `run_operation()` instead')
def result(self) -> List[OperationResult]:
Expand Down
27 changes: 2 additions & 25 deletions src/pytezos/rpc/node.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,13 @@
import json
import time
from functools import wraps
from pprint import pformat
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

import requests
import requests.exceptions
from simplejson import JSONDecodeError

from pytezos.logging import logger

REQUEST_RETRY_COUNT = 3
REQUEST_RETRY_SLEEP = 1


def _retry(fn: Callable):
@wraps(fn)
def wrapper(*args, **kwargs):
for attempt in range(REQUEST_RETRY_COUNT):
logger.debug('Node request attempt %s/%s', attempt + 1, REQUEST_RETRY_COUNT)
try:
return fn(*args, **kwargs)
except requests.exceptions.ConnectionError as e:
if attempt + 1 == REQUEST_RETRY_COUNT:
raise e
logger.warning(e)
time.sleep(REQUEST_RETRY_SLEEP)

return wrapper


def _urljoin(*args: str) -> str:
return "/".join(map(lambda x: str(x).strip('/'), args))
Expand Down Expand Up @@ -93,7 +72,6 @@ def __init__(self, uri: Union[str, List[str]]) -> None:
if not isinstance(uri, list):
uri = [uri]
self.uri = uri
self._session = requests.Session()

def __repr__(self) -> str:
res = [
Expand All @@ -113,7 +91,7 @@ def request(self, method: str, path: str, **kwargs) -> requests.Response:
:returns: node response
"""
logger.debug('>>>>> %s %s\n%s', method, path, json.dumps(kwargs, indent=4))
res = self._session.request(
res = requests.request(
method=method,
url=_urljoin(self.uri[0], path),
headers={
Expand All @@ -132,7 +110,6 @@ def request(self, method: str, path: str, **kwargs) -> requests.Response:
logger.debug('<<<<< %s\n%s', res.status_code, json.dumps(res.json(), indent=4))
return res

@_retry
def get(self, path: str, params: Optional[Dict[str, Any]] = None, timeout: Optional[int] = None) -> requests.Response:
return self.request('GET', path, params=params, timeout=timeout).json()

Expand Down
Loading

0 comments on commit 5273e1a

Please sign in to comment.