#
# Copyright 2017 Quantopian, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from collections import namedtuple, OrderedDict
from functools import partial
from math import isnan
import logbook
import numpy as np
import pandas as pd
from six import iteritems, itervalues, PY2
from zipline.assets import Future
from zipline.finance.transaction import Transaction
import zipline.protocol as zp
from zipline.utils.sentinel import sentinel
from .position import Position
from ._finance_ext import (
PositionStats,
calculate_position_tracker_stats,
update_position_last_sale_prices,
)
log = logbook.Logger('Performance')
[docs]class PositionTracker(object):
"""The current state of the positions held.
Parameters
----------
data_frequency : {'daily', 'minute'}
The data frequency of the simulation.
"""
def __init__(self, data_frequency):
self.positions = OrderedDict()
self._unpaid_dividends = {}
self._unpaid_stock_dividends = {}
self._positions_store = zp.Positions()
self.data_frequency = data_frequency
# cache the stats until something alters our positions
self._dirty_stats = True
self._stats = PositionStats.new()
def update_position(self,
asset,
amount=None,
last_sale_price=None,
last_sale_date=None,
cost_basis=None):
self._dirty_stats = True
if asset not in self.positions:
position = Position(asset)
self.positions[asset] = position
else:
position = self.positions[asset]
if amount is not None:
position.amount = amount
if last_sale_price is not None:
position.last_sale_price = last_sale_price
if last_sale_date is not None:
position.last_sale_date = last_sale_date
if cost_basis is not None:
position.cost_basis = cost_basis
def execute_transaction(self, txn):
self._dirty_stats = True
asset = txn.asset
if asset not in self.positions:
position = Position(asset)
self.positions[asset] = position
else:
position = self.positions[asset]
position.update(txn)
if position.amount == 0:
del self.positions[asset]
try:
# if this position exists in our user-facing dictionary,
# remove it as well.
del self._positions_store[asset]
except KeyError:
pass
def handle_commission(self, asset, cost):
# Adjust the cost basis of the stock if we own it
if asset in self.positions:
self._dirty_stats = True
self.positions[asset].adjust_commission_cost_basis(asset, cost)
[docs] def handle_splits(self, splits):
"""Processes a list of splits by modifying any positions as needed.
Parameters
----------
splits: list
A list of splits. Each split is a tuple of (asset, ratio).
Returns
-------
int: The leftover cash from fractional shares after modifying each
position.
"""
total_leftover_cash = 0
for asset, ratio in splits:
if asset in self.positions:
self._dirty_stats = True
# Make the position object handle the split. It returns the
# leftover cash from a fractional share, if there is any.
position = self.positions[asset]
leftover_cash = position.handle_split(asset, ratio)
total_leftover_cash += leftover_cash
return total_leftover_cash
[docs] def earn_dividends(self, cash_dividends, stock_dividends):
"""Given a list of dividends whose ex_dates are all the next trading
day, calculate and store the cash and/or stock payments to be paid on
each dividend's pay date.
Parameters
----------
cash_dividends : iterable of (asset, amount, pay_date) namedtuples
stock_dividends: iterable of (asset, payment_asset, ratio, pay_date)
namedtuples.
"""
for cash_dividend in cash_dividends:
self._dirty_stats = True # only mark dirty if we pay a dividend
# Store the earned dividends so that they can be paid on the
# dividends' pay_dates.
div_owed = self.positions[cash_dividend.asset].earn_dividend(
cash_dividend,
)
try:
self._unpaid_dividends[cash_dividend.pay_date].append(div_owed)
except KeyError:
self._unpaid_dividends[cash_dividend.pay_date] = [div_owed]
for stock_dividend in stock_dividends:
self._dirty_stats = True # only mark dirty if we pay a dividend
div_owed = self.positions[
stock_dividend.asset
].earn_stock_dividend(stock_dividend)
try:
self._unpaid_stock_dividends[stock_dividend.pay_date].append(
div_owed,
)
except KeyError:
self._unpaid_stock_dividends[stock_dividend.pay_date] = [
div_owed,
]
[docs] def pay_dividends(self, next_trading_day):
"""
Returns a cash payment based on the dividends that should be paid out
according to the accumulated bookkeeping of earned, unpaid, and stock
dividends.
"""
net_cash_payment = 0.0
try:
payments = self._unpaid_dividends[next_trading_day]
# Mark these dividends as paid by dropping them from our unpaid
del self._unpaid_dividends[next_trading_day]
except KeyError:
payments = []
# representing the fact that we're required to reimburse the owner of
# the stock for any dividends paid while borrowing.
for payment in payments:
net_cash_payment += payment['amount']
# Add stock for any stock dividends paid. Again, the values here may
# be negative in the case of short positions.
try:
stock_payments = self._unpaid_stock_dividends[next_trading_day]
except KeyError:
stock_payments = []
for stock_payment in stock_payments:
payment_asset = stock_payment['payment_asset']
share_count = stock_payment['share_count']
# note we create a Position for stock dividend if we don't
# already own the asset
if payment_asset in self.positions:
position = self.positions[payment_asset]
else:
position = self.positions[payment_asset] = Position(
payment_asset,
)
position.amount += share_count
return net_cash_payment
def maybe_create_close_position_transaction(self, asset, dt, data_portal):
if not self.positions.get(asset):
return None
amount = self.positions.get(asset).amount
price = data_portal.get_spot_value(
asset, 'price', dt, self.data_frequency)
# Get the last traded price if price is no longer available
if isnan(price):
price = self.positions.get(asset).last_sale_price
return Transaction(
asset=asset,
amount=-amount,
dt=dt,
price=price,
order_id=None,
)
def get_positions(self):
positions = self._positions_store
for asset, pos in iteritems(self.positions):
# Adds the new position if we didn't have one before, or overwrite
# one we have currently
positions[asset] = pos.protocol_position
return positions
def get_position_list(self):
return [
pos.to_dict()
for asset, pos in iteritems(self.positions)
if pos.amount != 0
]
def sync_last_sale_prices(self,
dt,
data_portal,
handle_non_market_minutes=False):
self._dirty_stats = True
if handle_non_market_minutes:
previous_minute = data_portal.trading_calendar.previous_minute(dt)
get_price = partial(
data_portal.get_adjusted_value,
field='price',
dt=previous_minute,
perspective_dt=dt,
data_frequency=self.data_frequency,
)
else:
get_price = partial(
data_portal.get_scalar_asset_spot_value,
field='price',
dt=dt,
data_frequency=self.data_frequency,
)
update_position_last_sale_prices(self.positions, get_price, dt)
@property
def stats(self):
"""The current status of the positions.
Returns
-------
stats : PositionStats
The current stats position stats.
Notes
-----
This is cached, repeated access will not recompute the stats until
the stats may have changed.
"""
if self._dirty_stats:
calculate_position_tracker_stats(self.positions, self._stats)
self._dirty_stats = False
return self._stats
if PY2:
def move_to_end(ordered_dict, key, last=False):
if last:
ordered_dict[key] = ordered_dict.pop(key)
else:
# please don't do this in python 2 ;_;
new_first_element = ordered_dict.pop(key)
# the items (without the given key) in the order they were inserted
items = ordered_dict.items()
# reset the ordered_dict to re-insert in the new order
ordered_dict.clear()
ordered_dict[key] = new_first_element
# add the items back in their original order
ordered_dict.update(items)
else:
move_to_end = OrderedDict.move_to_end
PeriodStats = namedtuple(
'PeriodStats',
'net_liquidation gross_leverage net_leverage',
)
not_overridden = sentinel(
'not_overridden',
'Mark that an account field has not been overridden',
)
[docs]class Ledger(object):
"""The ledger tracks all orders and transactions as well as the current
state of the portfolio and positions.
Attributes
----------
portfolio : zipline.protocol.Portfolio
The updated portfolio being managed.
account : zipline.protocol.Account
The updated account being managed.
position_tracker : PositionTracker
The current set of positions.
todays_returns : float
The current day's returns. In minute emission mode, this is the partial
day's returns. In daily emission mode, this is
``daily_returns[session]``.
daily_returns_series : pd.Series
The daily returns series. Days that have not yet finished will hold
a value of ``np.nan``.
daily_returns_array : np.ndarray
The daily returns as an ndarray. Days that have not yet finished will
hold a value of ``np.nan``.
"""
def __init__(self, trading_sessions, capital_base, data_frequency):
if len(trading_sessions):
start = trading_sessions[0]
else:
start = None
# Have some fields of the portfolio changed? This should be accessed
# through ``self._dirty_portfolio``
self.__dirty_portfolio = False
self._immutable_portfolio = zp.Portfolio(start, capital_base)
self._portfolio = zp.MutableView(self._immutable_portfolio)
self.daily_returns_series = pd.Series(
np.nan,
index=trading_sessions,
)
# Get a view into the storage of the returns series. Metrics
# can access this directly in minute mode for performance reasons.
self.daily_returns_array = self.daily_returns_series.values
self._previous_total_returns = 0
# this is a component of the cache key for the account
self._position_stats = None
# Have some fields of the account changed?
self._dirty_account = True
self._immutable_account = zp.Account()
self._account = zp.MutableView(self._immutable_account)
# The broker blotter can override some fields on the account. This is
# way to tangled up at the moment but we aren't fixing it today.
self._account_overrides = {}
self.position_tracker = PositionTracker(data_frequency)
self._processed_transactions = {}
self._orders_by_modified = {}
self._orders_by_id = OrderedDict()
# Keyed by asset, the previous last sale price of positions with
# payouts on price differences, e.g. Futures.
#
# This dt is not the previous minute to the minute for which the
# calculation is done, but the last sale price either before the period
# start, or when the price at execution.
self._payout_last_sale_prices = {}
@property
def todays_returns(self):
# compute today's returns in returns space instead of portfolio-value
# space to work even when we have capital changes
return (
(self.portfolio.returns + 1) /
(self._previous_total_returns + 1) -
1
)
@property
def _dirty_portfolio(self):
return self.__dirty_portfolio
@_dirty_portfolio.setter
def _dirty_portfolio(self, value):
if value:
# marking the portfolio as dirty also marks the account as dirty
self.__dirty_portfolio = self._dirty_account = value
else:
self.__dirty_portfolio = value
def start_of_session(self, session_label):
self._processed_transactions.clear()
self._orders_by_modified.clear()
self._orders_by_id.clear()
# Save the previous day's total returns so that ``todays_returns``
# produces returns since yesterday. This does not happen in
# ``end_of_session`` because we want ``todays_returns`` to produce the
# correct value in metric ``end_of_session`` handlers.
self._previous_total_returns = self.portfolio.returns
def end_of_bar(self, session_ix):
# make daily_returns hold the partial returns, this saves many
# metrics from doing a concat and copying all of the previous
# returns
self.daily_returns_array[session_ix] = self.todays_returns
def end_of_session(self, session_ix):
# save the daily returns time-series
self.daily_returns_series[session_ix] = self.todays_returns
def sync_last_sale_prices(self,
dt,
data_portal,
handle_non_market_minutes=False):
self.position_tracker.sync_last_sale_prices(
dt,
data_portal,
handle_non_market_minutes=handle_non_market_minutes,
)
self._dirty_portfolio = True
@staticmethod
def _calculate_payout(multiplier, amount, old_price, price):
return (price - old_price) * multiplier * amount
def _cash_flow(self, amount):
self._dirty_portfolio = True
p = self._portfolio
p.cash_flow += amount
p.cash += amount
[docs] def process_transaction(self, transaction):
"""Add a transaction to ledger, updating the current state as needed.
Parameters
----------
transaction : zp.Transaction
The transaction to execute.
"""
asset = transaction.asset
if isinstance(asset, Future):
try:
old_price = self._payout_last_sale_prices[asset]
except KeyError:
self._payout_last_sale_prices[asset] = transaction.price
else:
position = self.position_tracker.positions[asset]
amount = position.amount
price = transaction.price
self._cash_flow(
self._calculate_payout(
asset.price_multiplier,
amount,
old_price,
price,
),
)
if amount + transaction.amount == 0:
del self._payout_last_sale_prices[asset]
else:
self._payout_last_sale_prices[asset] = price
else:
self._cash_flow(-(transaction.price * transaction.amount))
self.position_tracker.execute_transaction(transaction)
# we only ever want the dict form from now on
transaction_dict = transaction.to_dict()
try:
self._processed_transactions[transaction.dt].append(
transaction_dict,
)
except KeyError:
self._processed_transactions[transaction.dt] = [transaction_dict]
[docs] def process_splits(self, splits):
"""Processes a list of splits by modifying any positions as needed.
Parameters
----------
splits: list[(Asset, float)]
A list of splits. Each split is a tuple of (asset, ratio).
"""
leftover_cash = self.position_tracker.handle_splits(splits)
if leftover_cash > 0:
self._cash_flow(leftover_cash)
[docs] def process_order(self, order):
"""Keep track of an order that was placed.
Parameters
----------
order : zp.Order
The order to record.
"""
try:
dt_orders = self._orders_by_modified[order.dt]
except KeyError:
self._orders_by_modified[order.dt] = OrderedDict([
(order.id, order),
])
self._orders_by_id[order.id] = order
else:
self._orders_by_id[order.id] = dt_orders[order.id] = order
# to preserve the order of the orders by modified date
move_to_end(dt_orders, order.id, last=True)
move_to_end(self._orders_by_id, order.id, last=True)
[docs] def process_commission(self, commission):
"""Process the commission.
Parameters
----------
commission : zp.Event
The commission being paid.
"""
asset = commission['asset']
cost = commission['cost']
self.position_tracker.handle_commission(asset, cost)
self._cash_flow(-cost)
def close_position(self, asset, dt, data_portal):
txn = self.position_tracker.maybe_create_close_position_transaction(
asset,
dt,
data_portal,
)
if txn is not None:
self.process_transaction(txn)
[docs] def process_dividends(self, next_session, asset_finder, adjustment_reader):
"""Process dividends for the next session.
This will earn us any dividends whose ex-date is the next session as
well as paying out any dividends whose pay-date is the next session
"""
position_tracker = self.position_tracker
# Earn dividends whose ex_date is the next trading day. We need to
# check if we own any of these stocks so we know to pay them out when
# the pay date comes.
held_sids = set(position_tracker.positions)
if held_sids:
cash_dividends = adjustment_reader.get_dividends_with_ex_date(
held_sids,
next_session,
asset_finder
)
stock_dividends = (
adjustment_reader.get_stock_dividends_with_ex_date(
held_sids,
next_session,
asset_finder
)
)
# Earning a dividend just marks that we need to get paid out on
# the dividend's pay-date. This does not affect our cash yet.
position_tracker.earn_dividends(
cash_dividends,
stock_dividends,
)
# Pay out the dividends whose pay-date is the next session. This does
# affect out cash.
self._cash_flow(
position_tracker.pay_dividends(
next_session,
),
)
def capital_change(self, change_amount):
self.update_portfolio()
portfolio = self._portfolio
# we update the cash and total value so this is not dirty
portfolio.portfolio_value += change_amount
portfolio.cash += change_amount
[docs] def transactions(self, dt=None):
"""Retrieve the dict-form of all of the transactions in a given bar or
for the whole simulation.
Parameters
----------
dt : pd.Timestamp or None, optional
The particular datetime to look up transactions for. If not passed,
or None is explicitly passed, all of the transactions will be
returned.
Returns
-------
transactions : list[dict]
The transaction information.
"""
if dt is None:
# flatten the by-day transactions
return [
txn
for by_day in itervalues(self._processed_transactions)
for txn in by_day
]
return self._processed_transactions.get(dt, [])
[docs] def orders(self, dt=None):
"""Retrieve the dict-form of all of the orders in a given bar or for
the whole simulation.
Parameters
----------
dt : pd.Timestamp or None, optional
The particular datetime to look up order for. If not passed, or
None is explicitly passed, all of the orders will be returned.
Returns
-------
orders : list[dict]
The order information.
"""
if dt is None:
# orders by id is already flattened
return [o.to_dict() for o in itervalues(self._orders_by_id)]
return [
o.to_dict()
for o in itervalues(self._orders_by_modified.get(dt, {}))
]
@property
def positions(self):
return self.position_tracker.get_position_list()
def _get_payout_total(self, positions):
calculate_payout = self._calculate_payout
payout_last_sale_prices = self._payout_last_sale_prices
total = 0
for asset, old_price in iteritems(payout_last_sale_prices):
position = positions[asset]
payout_last_sale_prices[asset] = price = position.last_sale_price
amount = position.amount
total += calculate_payout(
asset.price_multiplier,
amount,
old_price,
price,
)
return total
[docs] def update_portfolio(self):
"""Force a computation of the current portfolio state.
"""
if not self._dirty_portfolio:
return
portfolio = self._portfolio
pt = self.position_tracker
portfolio.positions = pt.get_positions()
position_stats = pt.stats
portfolio.positions_value = position_value = (
position_stats.net_value
)
portfolio.positions_exposure = position_stats.net_exposure
self._cash_flow(self._get_payout_total(pt.positions))
start_value = portfolio.portfolio_value
# update the new starting value
portfolio.portfolio_value = end_value = portfolio.cash + position_value
pnl = end_value - start_value
if start_value != 0:
returns = pnl / start_value
else:
returns = 0.0
portfolio.pnl += pnl
portfolio.returns = (
(1 + portfolio.returns) *
(1 + returns) -
1
)
# the portfolio has been fully synced
self._dirty_portfolio = False
@property
def portfolio(self):
"""Compute the current portfolio.
Notes
-----
This is cached, repeated access will not recompute the portfolio until
the portfolio may have changed.
"""
self.update_portfolio()
return self._immutable_portfolio
def calculate_period_stats(self):
position_stats = self.position_tracker.stats
portfolio_value = self.portfolio.portfolio_value
if portfolio_value == 0:
gross_leverage = net_leverage = np.inf
else:
gross_leverage = position_stats.gross_exposure / portfolio_value
net_leverage = position_stats.net_exposure / portfolio_value
return portfolio_value, gross_leverage, net_leverage
[docs] def override_account_fields(self,
settled_cash=not_overridden,
accrued_interest=not_overridden,
buying_power=not_overridden,
equity_with_loan=not_overridden,
total_positions_value=not_overridden,
total_positions_exposure=not_overridden,
regt_equity=not_overridden,
regt_margin=not_overridden,
initial_margin_requirement=not_overridden,
maintenance_margin_requirement=not_overridden,
available_funds=not_overridden,
excess_liquidity=not_overridden,
cushion=not_overridden,
day_trades_remaining=not_overridden,
leverage=not_overridden,
net_leverage=not_overridden,
net_liquidation=not_overridden):
"""Override fields on ``self.account``.
"""
# mark that the portfolio is dirty to override the fields again
self._dirty_account = True
self._account_overrides = kwargs = {
k: v for k, v in locals().items() if v is not not_overridden
}
del kwargs['self']
@property
def account(self):
if self._dirty_account:
portfolio = self.portfolio
account = self._account
# If no attribute is found in the ``_account_overrides`` resort to
# the following default values. If an attribute is found use the
# existing value. For instance, a broker may provide updates to
# these attributes. In this case we do not want to over write the
# broker values with the default values.
account.settled_cash = portfolio.cash
account.accrued_interest = 0.0
account.buying_power = np.inf
account.equity_with_loan = portfolio.portfolio_value
account.total_positions_value = (
portfolio.portfolio_value - portfolio.cash
)
account.total_positions_exposure = (
portfolio.positions_exposure
)
account.regt_equity = portfolio.cash
account.regt_margin = np.inf
account.initial_margin_requirement = 0.0
account.maintenance_margin_requirement = 0.0
account.available_funds = portfolio.cash
account.excess_liquidity = portfolio.cash
account.cushion = (
(portfolio.cash / portfolio.portfolio_value)
if portfolio.portfolio_value else
np.nan
)
account.day_trades_remaining = np.inf
(account.net_liquidation,
account.gross_leverage,
account.net_leverage) = self.calculate_period_stats()
account.leverage = account.gross_leverage
# apply the overrides
for k, v in iteritems(self._account_overrides):
setattr(account, k, v)
# the account has been fully synced
self._dirty_account = False
return self._immutable_account