Detección de cambios direccionales en Python

El algoritmo

Esta semana mi mujer compartió de forma humorística un tweet en el que decía que de buena mañana su marido le hablaba de cambios direccionales, series temporales y fractales. Nos pudimos reír con el tema y fue la comidilla entre algunos amigos unos días. El caso es que estuve esta semana pasada programando algunos algoritmos sobre el tema. Para entender la envergadura del problema, valga una breve lista de algunos estudios sobre esta materia pues no es un asunto trivial.

  • Detección y análisis de CD (cambio direccional) de saltos en datos intra-día. Hvozdyk, Lyudmyla (PhD).
  • Identificación de CD basado en eventos. Alkhamees, Nora. IADS.
  • Negociación de Hombro-Cabeza-Hombro basado en CD. Li, Shengnan. CCFEA.
  • Hechos estilizados (regularidades empíricas) en CD, negociación algorítmica. Golub, Anton.
  • Negociación algorítmica. Kampouridis, Michael (PhD). Univ. de Kent.
  • Análisis de rebasamiento. Sun, Jianyong (PhD). Universidad de Essex.
  • Caracterización de CD, datos de alta frecuenca. Serguieva, Antoaneta (PhD). Computación financiera y Analítica, UCL.
  • Caracterización del CD. Gao, Jing. Universidad de Beihang.
  • Carecterización de CD: glosario y comparación. Tao, Ran (PhD). CCFEA.
  • Negociación algorítmica (proyecto de doctorado). Ye, Alan. Universidad de Greenwhich.

Y claro, uno se pregunta qué hace tanto doctor y doctorando escribiendo sobre el asunto. Y al final se tira una semana estudiando el problema y buscando la solución más sencilla. Es lo que tiene dedicarse a la ingeniería del software: cuando más fácil y más rápido, mejor.

Así que he escrito una pequeña variación del algoritmo de Bill Williams para calcular fractales. Sus aplicaciones en trading son innumerables. Desde cálculo de canales, soportes y resistencias, trading armónico, categorización de sesiones del mercado bursátil, determinación de momento y tendencia del valor de instrumentos financieros, análisis de patrones técnicos y todo lo que a quien lo explote se le ocurra.

import numpy as np
import pandas as pd
 
class Fractals(object):
    __data = None
    __peaks = None
    __valleys = None
    __column_mode = None
 
    def __init__(self):
        pass
 
    def getFractals(self, data, column_mode="high", depth=3):
        Bars = data.shape[0]
        peaksBuffer = np.empty(Bars)
        valleysBuffer = np.empty(Bars)
        peaksBuffer [ : ] = np.NaN
        valleysBuffer [ : ] = np.NaN
 
        i = depth
 
        while (i < Bars - depth - 1):
 
            is_upper_fractal = False
            is_lower_fractal = False
 
            lower_range_pos = i - depth
            upper_range_pos = i + depth + 1
            N = lower_range_pos + depth
 
            lower_range_values = data.iloc[lower_range_pos:N][column_mode].values
            upper_range_values = data.iloc[N + 1:upper_range_pos][column_mode].values
            N_value = data.iloc[N][column_mode]
 
            # Basic Fractal:
            # Peaks
            if np.append([N_value], lower_range_values).argmax() == 0 and\
                    np.append([N_value],upper_range_values).argmax() == 0:
 
                if N_value not in lower_range_values:
                    is_upper_fractal = True
                    peaksBuffer[N] = N_value
 
            # Valleys
            if not is_upper_fractal:
                if np.append([N_value], lower_range_values).argmin() == 0 and \
                        np.append([N_value], upper_range_values).argmin() == 0:
 
                    if N_value not in lower_range_values:
                        is_lower_fractal = True
                        valleysBuffer[N] = N_value
            i += 1
 
            peaksBufferSeries = pd.Series(peaksBuffer, name="peaks", index=data.index).dropna()
            valleysBufferSeries = pd.Series(valleysBuffer, name="valleys", index=data.index).dropna()
 
            self.__data = data
            self.__peaks = peaksBufferSeries
            self.__valleys = valleysBufferSeries
            self.__column_mode = column_mode
 
        return pd.merge(
            peaksBufferSeries, valleysBufferSeries, left_index=True, right_index=True, how="outer", sort=False
        )

¿Cómo se utiliza esto? Hace falta pasarle un DataFrame de pandas cuyo índice sea de tipo datetime y que esté ordenado ascendente.

Los datos

Supongamos que tenemos el siguiente DataTrame:

date open high low close
2003-05-28 08:40:00 1.1782 1.17968 1.17619 1.1795
2003-05-28 08:45:00 1.17967 1.18012 1.17967 1.18004
2003-05-28 08:50:00 1.17996 1.18007 1.17939 1.17939
2003-05-28 08:55:00 1.17932 1.17944 1.17691 1.17695
2003-05-28 09:00:00 1.17702 1.17796 1.17702 1.1779
2003-05-28 09:05:00 1.17795 1.17823 1.17759 1.17759
2003-05-28 09:10:00 1.17768 1.17805 1.17756 1.17802
2003-05-28 09:15:00 1.17791 1.17802 1.1778 1.17787
2003-05-28 09:20:00 1.17796 1.17923 1.17773 1.17923
2003-05-28 09:25:00 1.17933 1.17935 1.17786 1.17789
2003-05-28 09:30:00 1.17793 1.17832 1.17762 1.17793
2003-05-28 09:35:00 1.17794 1.17839 1.17789 1.17802
2003-05-28 09:40:00 1.17808 1.17837 1.177 1.17703
2003-05-28 09:45:00 1.17673 1.17827 1.17666 1.17827
2003-05-28 09:50:00 1.17819 1.1783 1.17779 1.17788
2003-05-28 09:55:00 1.17784 1.17872 1.17784 1.17872
2003-05-28 10:00:00 1.17847 1.17867 1.17833 1.17844
2003-05-28 10:05:00 1.17863 1.17886 1.17852 1.17862
2003-05-28 10:10:00 1.17865 1.17872 1.17771 1.17772
2003-05-28 10:15:00 1.1775 1.1787 1.17744 1.17839
2003-05-28 10:20:00 1.1784 1.17883 1.17837 1.17859
2003-05-28 10:25:00 1.17859 1.17869 1.17842 1.17855
2003-05-28 10:30:00 1.17859 1.1787 1.17839 1.17851
2003-05-28 10:35:00 1.17845 1.17874 1.17831 1.17867
2003-05-28 10:40:00 1.17866 1.17977 1.17818 1.17861
2003-05-28 10:45:00 1.17852 1.17866 1.17822 1.17856
2003-05-28 10:50:00 1.1787 1.17893 1.1786 1.1786
2003-05-28 10:55:00 1.17855 1.17855 1.17786 1.17786
2003-05-28 11:00:00 1.17769 1.17783 1.17426 1.17429
2003-05-28 11:05:00 1.17429 1.17474 1.17411 1.17456
2003-05-28 11:10:00 1.17467 1.17516 1.17462 1.17484
2003-05-28 11:15:00 1.17472 1.17517 1.17217 1.17517
2003-05-28 11:20:00 1.17518 1.17537 1.17493 1.17493
2003-05-28 11:25:00 1.17493 1.17532 1.17469 1.17511
2003-05-28 11:30:00 1.17504 1.17504 1.17436 1.17469
2003-05-28 11:35:00 1.17471 1.17563 1.17471 1.17558
2003-05-28 11:40:00 1.17559 1.17793 1.17528 1.17529
2003-05-28 11:45:00 1.17519 1.17559 1.17506 1.17547
2003-05-28 11:50:00 1.17558 1.17572 1.17533 1.17533
2003-05-28 11:55:00 1.17536 1.17536 1.17307 1.17307
2003-05-28 12:00:00 1.17268 1.17268 1.17166 1.17167
2003-05-28 12:05:00 1.17168 1.17237 1.17147 1.17224
2003-05-28 12:10:00 1.17214 1.17228 1.17194 1.17217
2003-05-28 12:15:00 1.17224 1.17237 1.17187 1.17209
2003-05-28 12:20:00 1.17205 1.17229 1.17153 1.17153
2003-05-28 12:25:00 1.17121 1.17226 1.17047 1.17204
2003-05-28 12:30:00 1.17208 1.1726 1.17181 1.17259
2003-05-28 12:35:00 1.17261 1.17264 1.17182 1.17192
2003-05-28 12:40:00 1.17203 1.17337 1.17189 1.17337
2003-05-28 12:45:00 1.17351 1.1736 1.1714 1.17153

Si el campo «date» es una columna y el dataframe se llama «data», por ejemplo, tendríamos que escribir lo siguiente para poder usarlo como argumento, pues el  índice ha de ser datetime:

data = data.set_index("date")

Por supuesto el código de la clase anterior se puede modificar para funcionar con numpy o cualquier otra biblioteca.

La aplicación

Si queremos obtener los máximos y mínimos locales de los valores «high» de nuestra serie de datos utilizando 6 valores para calcular los picos y valles, es tan simple como hacer esto:

#"data" es un DataFrame que contiene los datos de la tabla de arriba.
fr = fractals()
high_fractals = fr.getFractals(data=data,column_mode="high", depth=3)

El resultado se cargaría en la variable high_fractals y sería el siguiente:

Y para extraer los máximos, por ejemplo, solo bastaría ejecutar el siguiente código:

Con una profundidad de 2, éste sería el resultado para los datos:

import matplotlib.pyplot as plt
 
fr = fractals() 
#"data" es un DataFrame que contiene los datos de la tabla de arriba. fr = fractals() 
high_fractals = fr.getFractals(data=data,column_mode="high", depth=2)
 
fig, ax = plt.subplots()
fig.set_size_inches(10, 5)
plt.plot(high_fractals["valleys"].dropna(), marker='^', linestyle="none", markersize=7,  color="red")
plt.plot(high_fractals["peaks"].dropna(), marker='v', linestyle="none", markersize=7,  color="green")
plt.plot(data.high, color="black")
plt.show()

El resultado es más que satisfactorio para la mayoría de escenarios.  En mi caso, lo voy a mejorar añadiendo también identificación de movimientos a partir de cierto nivel (porcentaje, pips, etc). Pero, en general, este sencillo algoritmo cumple con todas las expectativas que personalmente necesito para marcar cambios direccionales en series temporales. Menos es más.

Si en vez de mostrar la serie sobre la que hemos obtenido los picos y valles, empleamos toda la serie de precios, el resultado será mucho más obvio. Para el siguiente ejemplo, vamos a utilizar un total de 200 puntos, en vez de los 50 de antes.

#cargamos un dataframe con 200 datos.
data = all_historical_data.dropna()[5000:5200]
 
fr = fractals()
 
high_fractals = fr.getFractals(data=data,column_mode="high", depth=3)
high_fractals["fractal_type"]="highs"
high_fractals=high_fractals.reset_index().set_index(["fractal_type","date"])
 
low_fractals = fr.getFractals(data=data,column_mode="low", depth=3)
low_fractals["fractal_type"]="lows"
low_fractals=low_fractals.reset_index().set_index(["fractal_type","date"])
 
# Vamos a incluir una serie de librerías que nos interesan.
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import matplotlib.dates as mdates
from mpl_finance import candlestick_ohlc
from datetime import date
import datetime
 
fig, ax1 = plt.subplots(figsize=(20,5))
ax1.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
 
#Creamos un conjunto de velas japonesas.
number_of_ticks = data.shape[0]
dates = data.index
x=0
ohlc=[]
while x < len(dates):
    d = mdates.date2num(dates[x])
    append_me = d, data["open"].values[x], data["high"].values[x], data["low"].values[x], data["close"].values[x]
    ohlc.append(append_me)
    x += 1
 
# Añadimos bordes a las velas.
lines, patches = candlestick_ohlc(ax1, ohlc, width=0.2*2.5/number_of_ticks, colorup="green", colordown="red")
for line, patch in zip(lines, patches):
    patch.set_edgecolor("black")
    patch.set_linewidth(0.9)
    patch.set_antialiased(False)
    line.set_color("black")
    line.set_zorder(0)
 
#Seleccionamos los máximos de los Highs (Higher Highs) y los mínimos de los lows (Lower Lows)
hfs = high_fractals.loc["highs"].drop("valleys",axis=1).dropna()
lfs = low_fractals.loc["lows"].drop("peaks",axis=1).dropna()
 
plt.plot(hfs, marker="o", linestyle="None")
plt.plot(lfs, marker="o", linestyle="None")
plt.savefig("fractals.png")
plt.show()

Y el resultado es la gráfica siguiente:

¿Y qué se puede hacer con estos puntos? Con dos puntos podemos pintar una línea, por ejemplo.

data = high_fractals["peaks"].dropna()
TF_seconds = 300 #timeframe de 5 minutos.
peaks = []
 
for i in range(1, data.shape[0]):
    peaks.append( { "type": "H", 
    "x1": data[i - 1:i].index, "y1": data[i - 1], 
    "x2": data[i:i + 1].index, "y2": data[i], 
    )
 
for point in peaks:
    diference = (point["x2"] - point['x1']).seconds[0]
    units = diference / TF_seconds
    slope = (point["y2"] - point["y1"]) / units
    slope = slope / 0.0001
    peaks[i]["slope"] = slope
    peaks[i]["units"] = units
    i += 1

Y he ahí todos los vectores directores de todas las líneas de todos los picos consecutivos. Con meta-información relativa a su pendiente e incremento de X en formato escalar (aparte de datetime). La imagen de abajo ilustra la estructura de la implementación completa, donde la pendiente está calculada en pips:

La imaginación es el límite. Podríamos utilizar estos datos para analizar varias características de nuestros vectores directores. Veamos un ejemplo con la implementación completa. Para ello vamos a unir en un solo dataframe todos los fractales y todos los vectores directores en segundo lugar:

all_analyzed_data=pd.concat([high_fractals,low_fractals])

Lo que nos generaría una estructura similar a ésta:

Con los datos de los fractales, vamos a generar los vectores directores entre cada punto y todos los puntos posteriores al mismo.

Digits=4
# Low peaks -> the lower high points
# Low valleys -> the lower low points
# High peaks -> the Higher high points
# High valleys -> THe Higher high points
 
tl_higher_highs = Trendlines(dataframe=all_analyzed_data.loc["highs"],column_mode="peaks",seconds=300, digits=Digits)
tl_higher_lows = Trendlines(dataframe=all_analyzed_data.loc["highs"],column_mode="valleys",seconds=300, digits=Digits)
 
tl_lower_lows = Trendlines(dataframe=all_analyzed_data.loc["lows"],column_mode="peaks",seconds=300, digits=Digits)
tl_lower_highs = Trendlines(dataframe=all_analyzed_data.loc["lows"],column_mode="valleys",seconds=300, digits=Digits)

Cada una de las cuatro variables, tendría una estructura similar a ésta:

Cada fila de la tabla anterior representa un vector director que une dos puntos de los identificados con el algoritmo de fractales del principio. Como estoy analizando los Higher Highs, me voy a centrar en los picos. Los datos del dataframe incluyen:

  • Tipo de vector: valle o pico.
  • x1: fecha del primer punto.
  • y1: precio del primer punto.
  • x2, y2: fecha y precio del segundo punto.
  • height: Altura entre y1 e y2.
  • pips: Altura entre y1 e y2 en pips.
  • slope: pendiente del vector en pips por unidades temporales (pips/5m en este caso).
  • width: Distancia horizontal en número de unidades temporales entre el punto 1 y el punto 2 (cada unidad temporal son 5 minutos en este caso).

Quitamos información innecesaria y preparamos el dataset para su análisis.

data_hhs = tl_higher_highs.getFractalsVectors()
data_hhs_peaks=data_hhs.reset_index().drop(["x1","x2","index_type"],axis=1)

Con lo que ahora tenemos el siguiente DataFrame:

Veamos algunas características. Como la distribución de número de pips de cada vector director.

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
from scipy import stats
 
sns.distplot(data_hhs_peaks["pips"].values)
plt.show()

Comparar la altura en pips de los puntos con la pendiente de la recta que los une.

df = data_hhs_peaks[["pips","slope"]]
#df = df[df.slope.apply(abs)<=1.5]
df.columns = ["x","y"]
 
g = sns.jointplot(x="x", y="y", data=df, kind="kde", color="brown")
g.plot_joint(plt.scatter, c="b", s=30, linewidth=1, marker="+")
g.ax_joint.collections[0].set_alpha(0)
g.set_axis_labels("$Pendiente$", "$Altura$");

Y comprender mejor el entorno sobre el que estamos trabajando. Porque no es sino con la comprensión profunda del problema, con lo que más oportunidades tenemos para encontrar soluciones óptimas.

Entornos de Inteligencia Artificial para Trading y Simulando Ticks

A estas alturas, si has estado intentando diseñar modelos de machine learning  o inteligencia artificial para hacer trading, ya te habrás dado cuenta de que la cosa no es tan fácil como normalizar unos cuantos datos históricos de Forex o Bolsa, meterlos en una red neuronal y esperar que tu output tipo {mantener, comprar, vender} te salga lo suficiente bien balanceado como para que la red negocie por ti.

De hecho, en el pasado, mi experiencia es que arquitecturas como XGBOOST daban mejores predicciones con datos suavizados con medias móviles de 2 o 3 muestras que perceptrones o LSTMs.

Todo depende del tiempo que uno tiene disponible. Si es poco, como en mi caso, lo mejor es ir primero a los más seguro y luego ir refinando el trabajo para poder, con la mayor brevedad posible, aprobar o descartar una metodología. Lo de «la mayor brevedad» es un concepto bastante relativo.

En mi caso, la mayor brevedad han sido unos cinco años de trabajo intenso más un esfuerzo financiero relevante. En mi caso, como ya he comentado en otras entradas, he estudiado casi todas las técnicas de trading, las he probado, validado; una tras otra, durante años. Tanto el trading manual como el trading algorítmico. Es decir, que no solo he estudiado trading o pagado por aprender, sino que además he tenido que investigar mucho sobre algoritmos, estructuras, lenguajes, estadística (incluyendo un master en Ciencia de Datos), probabilidad, machine learning, inteligencia artificial y otras materias relevantes.

En ese sentido, se puede decir que me he hecho una idea genérica relativamente amplia del asunto. Escribo este rollo, no para ponerme medallas, sino para que quien lea este texto entienda que el trading no es llegar y triunfar. En el mejor de los casos hacen falta entre 3 o 5 años para empezar a entenderlo y puede que la tasa de fracaso sea del 95%. Esto es, que de cada 100 personas que intentan hacer trading, solo 5 conseguiran cubrir costes y un porcentaje menor ganar algo. Y no solo depende de lo buena que sea la técnica utilizada, el 80% de los factores van a ser ajenos al dominio de una técnica de trading determinada. De hecho, no me pongo medallas porque no enseño mis técnicas gratuitamente nunca. Tengo por ahí una mención a un seminario de 5 días de trading por 12000 euros por si alguien tiene curiosidad por saber qué principios y técnicas utilizo.

Lo que voy a mostrar ahora es una implementación personalizada de un proveedor de datos virtual; que es más sencillo de programar que un canal de comunicación con mi broker, de forma que puedo hacer simulaciones y estudios exactamente igual que si estuviera conectado, pero con mucho menos esfuerzo. Con bastante esfuerzo, pero con mucho menos.

A estas alturas ya funciona razonablemente bien, es un borrador del final, pero ya tiene suficiente complejidad para que valga la pena como ejemplo. Me he programado un broker virtual también, porque me gusta la modularidad. Este broker puede recibir datos de distintas fuentes. Para ello, uso una interfazclase abstracta (depende del lenguaje) que define unos métodos y propiedades iguales, sea cual sea la fuente de datos.

Una de las fuentes de datos que utilizo son ficheros csv en formato tick (date, ask, bid). Estos los he convertido en OHLC para Ask y OHLC para bid en marcos temporales de 1-minuto y 5-minutos. Para mí es suficiente, pero si alguien quiere más granularidad y exactitud, puede usar directamente los datos ask-bid, o hacer un muestreo a 30-segundos, 10-segundos, etc. para los datos tick. Para que se entienda mejor, tengo:

  1. Un único csv con todos los datos OHLC en el marco temporal de 5-minutos entre 2003 y 2019. Este fichero se utiliza para dibujar la gráfica OHLC, entrenar un modelo estadístico o lo que sea.
  2. Un fichero por año de datos OHLC en el marco temporal de 1-minuto. Este fichero se utiliza para extraer los ticks que tendrán lugar hasta que la siguiente barra OHLC se tenga que dibujar. Son como los datos que nos van llegando del broker en tiempo real y muestran cómo va cambiando el precio del instrumento.

Como observación, estuve decidiendo si utilizar datos tick ASK/BID, pero me pareció que tanta precisión no era necesaria y que iba a utilizar un generador de ruído para calcular el spread mediante un generador de números aleatorios con una distribución T de student o bien promediando High-Low de mi muestra histórica. Pero eso es otra historia. Recordemos que compramos a precio Ask, pero vendemos a precio Bid. La comisión por operación también tiene que tenerse en cuenta, por ejemplo.

Vamos a plantear el sistema:

Datos históricos en 5 minutos
(última fila, núm. filas=n)
(AMD = año-mes-día)

Datos en «tiempo real» en 1 minuto
(todos los ticks desde que cierra la vela histórica hasta que abre la siguiente).
Date Open High Low Close
AMD 08:00:00 On Hn Ln Cn

Tengamos en cuenta que la vela de 8:00 en el TF de 5m contiene todos los precios entre las 8:00 y las 8H 4' 59''.

Por lo que las señales en tiempo real tienen que empezar a las 8H 5' 00''.

Date Open High Low Close
AMD 08:05:00  O1 H1  L1  C
AMD 08:06:00 O2 H2  L2  C
AMD 08:07:00 O3 H3  L3  C
AMD 08:08:00 O4 H4  L4  C
08:09:00 O5 H5  L5  C

Empecemos con el código:

# Como está orientado a csv, entonces no voy a incluir información sobre conexiones mediante sockets, puertos, ejecutables, etc...
# Definimos una clase abstracta que sirva de interfaz para el resto que vengan.
 
class Provider_abstract():
 
    def __init__(self):
        raise NotImplementedError
 
    def getRTPricesNext(self):
        raise NotImplementedError
 
    def getHistPricesNext(self):
        raise NotImplementedError
 
    def setTickDataPath(self):
        raise NotImplementedError
 
    def setHistoricalFile(self):
        raise NotImplementedError

Los métodos tendrán las siguientes funciones:

  • getRTPricesNext: Devuelve el dataframe con los ticks.
  • getHistPricesNext: Devuelve un histórico de precios.
  • setTickDataPath: El directorio donde están los datos tick.
  • setHistoricalFile: La ruta al fichero con los datos históricos.

Y ahora procedemos a la implementación, veamos el código completo primero.

from gym_gspfx.gspProviders.provider import Provider_abstract
import pandas as pd
 
# Reads TF data and generates tick data from the same or other data source.
# For instance, reads 5m data nd generates ticks from 1m data.
 
class Provider(Provider_abstract):
 
    def __init__(self, historical_file, tick_data_path, start_row=0, number_of_samples=300, \
                 tick_file_prefix="EURUSD_1M_"):
 
        self.min_year=None
        self.max_year=None
        self.__historical_file = ""
        self.__tick_data_path = ""
        self.setHistoricalFile(historical_file)
        self.setTickDataPath(tick_data_path)
        self.current_start_row_index = None
        self.number_of_samples = number_of_samples
        self.next_future_start_row = start_row
        self.last_closed_row = None
        self.next_future_row_to_deliver_index = 0
        self.next_two_future_rows_to_deliver = None
        self.__tick_data_files = {"EMPTY": None}
        self.tick_file_prefix=tick_file_prefix
 
 
    def setHistoricalFile(self,fileName):
        self.__historical_file = pd.read_csv(fileName, parse_dates=["date"]).set_index("date")
 
    def setTickDataPath(self, tick_data_path):
        self.__tick_data_path=tick_data_path
 
 
    def getHistPricesNext(self):
        self.current_start_row_index = self.next_future_row_to_deliver_index
        self.next_future_row_to_deliver_index += 1
 
        __df = self.__historical_file.iloc[self.current_start_row_index:self.current_start_row_index + self.number_of_samples]
 
        self.last_closed_row = __df.tail(1)
 
        self.next_two_future_rows_to_deliver = pd.DataFrame( \
            self.__historical_file.iloc[self.current_start_row_index + self.number_of_samples:\
                                        self.next_future_row_to_deliver_index + self.number_of_samples + 1])
        return __df.copy()
 
 
    def getRTPricesNext(self):
 
        if self.last_closed_row is None:
            return None
 
        if "EMPTY" in self.__tick_data_files.keys() and self.__historical_file.shape[0]>0:
            del self.__tick_data_files["EMPTY"]
 
        self.min_year = self.last_closed_row.index.year.values[0]
        self.max_year = self.next_two_future_rows_to_deliver.head(1).index.year.values[0]
 
        if self.max_year not in self.__tick_data_files.keys():
            self.__tick_data_files[self.max_year] = \
                pd.read_csv(self.__tick_data_path + "EURUSD_1M_" + str(self.max_year) + ".csv", parse_dates=["date"])
 
            if self.min_year in self.__tick_data_files.keys() and self.min_year != self.max_year:
                del self.__tick_data_files[self.min_year]
 
        initial_date = self.next_two_future_rows_to_deliver.head(1).index.values[0]
        end_date = self.next_two_future_rows_to_deliver.tail(1).index.values[0]
 
        __df = self.__tick_data_files[self.max_year]
        __df = __df[(__df["date"] >= initial_date) & (__df["date"] < end_date)]
 
        return __df.copy()

El método __init__ se encarga de parametrizar los valores iniciales de la clase, como la fila del histórico desde el que se comenzará a extraer datos, el número de filas del histórico que se extraerán. También se han de definir al instanciar la clase la ruta completa al fichero de históricos y la carpeta donde están los ficheros de ticks.

Los ficheros de ticks los guardo en un diccionario conforme los necesito por motivos prácticos. He optado por usar ficheros de 1m, pero podría tener otra estructura y he preferido prever la posibilidad de tener que utilizar ficheros de tick múltiples. Todo ello se define en el constructor, junto con atributos que serán utilizados por la clase.

 def __init__(self, historical_file, tick_data_path, start_row=0, number_of_samples=300, \
                 tick_file_prefix="EURUSD_1M_"):
 
        self.min_year=None
        self.max_year=None
        self.__historical_file = ""
        self.__tick_data_path = ""
        self.setHistoricalFile(historical_file)
        self.setTickDataPath(tick_data_path)
        self.current_start_row_index = None
        self.number_of_samples = number_of_samples
        self.next_future_start_row = start_row
        self.last_closed_row = None
        self.next_future_row_to_deliver_index = 0
        self.next_two_future_rows_to_deliver = None
        self.__tick_data_files = {"EMPTY": None}
        self.tick_file_prefix=tick_file_prefix

Tras ello, los métodos que cargan el fichero de históricos y configuran la ruta de los ficheros de ticks.
Nótese que utilizo la librería de pandas y que asumo que los datos vienen ya ordenados por fecha, ascendente.

    def setHistoricalFile(self,fileName):
        self.__historical_file = pd.read_csv(fileName, parse_dates=["date"]).set_index("date")
 
   def setTickDataPath(self, tick_data_path):
       self.__tick_data_path=tick_data_path

Cada vez que queremos avanzar 1 vela, tenemos que llamara a este procedimiento.
El procedimiento devuelve un DataFrame con el número de filas indicado en el constructor o modificado en el atributo number_of_samples.
Cada vez que se le llama, avanza 1 fila.

    def getHistPricesNext(self):
        self.current_start_row_index = self.next_future_row_to_deliver_index
        self.next_future_row_to_deliver_index += 1
 
        __df = self.__historical_file.iloc[self.current_start_row_index:self.current_start_row_index + self.number_of_samples]
 
        self.last_closed_row = __df.tail(1)
 
        self.next_two_future_rows_to_deliver = pd.DataFrame( \
            self.__historical_file.iloc[self.current_start_row_index + self.number_of_samples:\
                                        self.next_future_row_to_deliver_index + self.number_of_samples + 1])
        return __df.copy()

Por útlimo, tenemos el método que nos devuelve todos los ticks que se producirán desde el cierre de la última vela de los históricos hasta justo antes de la apertura del siguiente.

   def getRTPricesNext(self):
 
        if self.last_closed_row is None:
            return None
 
        if "EMPTY" in self.__tick_data_files.keys() and self.__historical_file.shape[0]>0:
            del self.__tick_data_files["EMPTY"]
 
        self.min_year = self.last_closed_row.index.year.values[0]
        self.max_year = self.next_two_future_rows_to_deliver.head(1).index.year.values[0]
 
        if self.max_year not in self.__tick_data_files.keys():
            self.__tick_data_files[self.max_year] = \
                pd.read_csv(self.__tick_data_path + "EURUSD_1M_" + str(self.max_year) + ".csv", parse_dates=["date"])
 
            if self.min_year in self.__tick_data_files.keys() and self.min_year != self.max_year:
                del self.__tick_data_files[self.min_year]
 
        initial_date = self.next_two_future_rows_to_deliver.head(1).index.values[0]
        end_date = self.next_two_future_rows_to_deliver.tail(1).index.values[0]
 
        __df = self.__tick_data_files[self.max_year]
        __df = __df[(__df["date"] >= initial_date) & (__df["date"] < end_date)]
 
        return __df.copy()

Estudiando el código anterior se puede hacer una idea de cómo realizar un proveedor de ticks y datos financieros personalizado, así como adaptarlo a las necesidades propias. Una aplicación puede ser un simulador de trading por ejemplo. Yo lo uso para mis proyectos de inteligencia artificial, de manera que no necesito estar conectado a ningún proveedor de precios online para simular series temporales y mercados financieros.

En la siguiente imagen se muestra cómo usar la clase creada con un ejemplo real.

Generador de datos históricos y tick en Python

Y si volvemos a ejecutar los métodos, obtenemos la vela siguiente con sus históricos y los ticks correspondientes.

Si alguien no conoce la librería Pandas y no entiende alguna de las expresiones del código, se pueden consultar en la documentación oficial, en el siguiente enlace: http://pandas.pydata.org/pandas-docs/stable/

Y preguntados:

  • ¿Cómo puede obtenerse algo negociando si no es desarrollando una técnica para acertar las suficientes veces como para cubrir los costes de los fallos?
  • ¿Cómo puede mantenerse un negocio sin conocer el riesgo asociado y una buena gestión financiera y de riesgos conforme?
  • ¿cómo podría un cerebro humano ser mejor calculando aspectos técnicos y numéricos que una compleja computadora?

Cómo cambiar precios de un marco temporal a otro con Python

En esta entrada voy a explicar cómo conseguir precios en formato tick y transformarlos en cualquier otro marco temporal. En concreto, vamos a coger un fichero de precios tick a tick y lo convertiremos en un fichero OHLC de 5 minutos. Podríamos convertirlo en cualquier otro formato, como 10 segundos o 1 día.

No voy a hacer publicidad de programas de software. Hay muchas maneras de conseguir datos. Por ejemplo, podemos exportar de nuestro cliente de trading los datos que va recibiendo del servidor. Pero en este caso, vamos a usar un gran servicio del broker Dukascopy que permite descargar ficheros de datos de múltiples fuentes gratis.

Para descargarlos, hay que ir a la web https://www.dukascopy.com/swiss/spanish/marketwatch/historical/ y elegir un instrumento. Una vez elegido podemos ya seleccionar los datos a descargar. Hay programas que lo hacen de forma automática, solo tienes que buscar en Internet para encontrar alguno gratis, que los hay.

En mi caso, me he descargado todos los datos tick a tick del par de divisas EURUSD entre 2003 y 2019 en formato CSV. Esto son unos 13Gb de tamaño. Las columnas que me interesan son: [«date»,»ask»,»bid»]. El tamaño es inmanejable, así que en principio, para echarle un vistazo, se puede usar la librería de Python llamada daske que tiene una especie de Pandas.DataFrame con algunas de las funciones de pandas y soporta ficheros enormes, distribuidos, etc…

Pero, en general, daske, no me vale para lo que quiero hacer, así que lo que voy a hacer en primer lugar es convertir el fichero único CSV de 13GB en unos más pequeños, porque si no colapsaré la memoria de mi equipo al procesar tanta información.

Pandas puede convertir en trozos un fichero más grande, así que he escrito este código para reducir el tamaño individual de cada archivo en bloques de 5E6 filas:

import pandas as pd
 
def process(chunk,count):
    prefix = chunk.iloc[0].date.split(" ")[0]
    chunk.to_csv(prefix + "_EURUSDTICK_" + str(count) + ".csv",index=False)
 
count=0
chunksize = 5 * 10 ** 6 
for chunk in pd.read_csv(r"./sample_data/TICK_EURUSD.csv",header=-1, chunksize=chunksize):
    count=count+1
    chunk.columns = ["date","ask","bid","volume","zero"]
    chunk=chunk[["date","ask","bid"]]
    process(chunk, count)

Tras el proceso, se me han generado unos 60 ficheros más pequeños.

Aún así, la columna date está en formato texto, tipo «2016.05.01 1:23:04.092», con lo que necesitaré formatearla a un tipo fecha. Para ello, voy a cargar cada uno de los ficheros y le aplicaré una función de conversión en el campo fecha.

afiles = [f for f in os.listdir("./sample_data") if ".csv" in f]
 
for file in afiles:
    print (file)
    df = pd.read_csv(file)
    df["date"]=df["date"].apply(lambda x:datetime.datetime.strptime(x,'%Y.%m.%d %H:%M:%S.%f'))
    df.to_csv(file)

Una vez reescritos o sobreescritos los csv parciales con la columna corregida, pasamos a cambiar el muestreo. Voy a preparar mis datos para convertir los ficheros de ticks en ficheros de 5m.

Como tenemos los precios Ask y Bid, nos interesa tener OHLC con ambos, para determinar cuál es el rango aplicable en compra y cuál en venta. Además, podemos calcular spreads de diferentes maneras. Veamos un ejemplo con uno de los ficheros generados cualquiera. Para que funcione, la columna date la tenemos que convertir en el índice del DataFrame.

processed = "./sample_data/2015.02.05_EURUSDTICK_41.csv"
df = pd.read_csv(processed)
df_5m_ohlc = df[["date","ask","bid"]].set_index("date").resample('5Min').ohlc()

Y tendríamos finalmente ya los datos convertidos de formato tick a format OHLC. Veamos el formato de los ficheros, cómo eran originalmente y cómo quedan al final.

Antes:

Después:

La utilidad de estos procesos es notablemente alta para la simulación de estrategias basadas en análisis técnico, ya que nos permite estudiar el marco temporal de referencia a la vez que generar los ticks, precios y spreads de forma exacta.

Predicting Stock Exchange Prices with Machine Learning

This article will describe how to get an average 75% prediction accuracy in next day’s average price change. The target magnitude is the 2-day simple moving average. The reason is that if we do not apply smoothing to daily prices, the forecasts are much harder to get. The minimum possible smoothing is two days, and that will be the target: altering actual prices as little as possible.

I have selected randomly a company from the New York Stock Exchange, it was «CNH Industrial NV». No reason for that, it has been a completely random choice among a couple thousand files I have generated extracted from either Yahoo! or Google finance, I do not remember the source. The files are uploaded here: https://drive.google.com/open?id=18DkJeCqpibKdR8ezwk9hGjdHYSGwovWH.

The method is valid for any financial data as long as it has the same structure. I have also tested it with Forex data getting similar accuracy levels with currencies such as EURUSD, GBPUSD or USDJPY. The interesting point of forecasting those quotes is that by examining where it fails, I think you will improve your price action trading skills and your understanding of the market and what matters.

Data Collection and Variable Configuration

There are millions of possible variable candidates that may seem valid to be analyzed. And which will be the target value we will try to aim? I like thinking that price is like any other object subject to physical laws. It reacts to market forces, it has an inertia, velocity, acceleration, etc.

The forces may be volume, it may have a potential energy depending if it is very high or very low, the rate of change may be important and so on. There are other many factors we could analyze such as gaps, breakouts, technical patterns, candlestick analysis or price distribution within space just to mention a few. For this example we will only be focused on price action and volume.

I have the files saved in csv format to be used with Excel, so let’s start loading the csv file into a DataFrame object using Python.

# Importing all the libraries that we will use.
 
import pandas as pd
import matplotlib.pyplot as plt
import xgboost as xgb
from sklearn.metrics import accuracy_score
 
#Load the data from a csv file.
CNHI = {"stock_name":"CNH Industrial NV", "data": pd.read_csv("./data/CNHI_excel.csv",sep="\t",header=0,decimal=',')}
 
CNHI["data"]=CNHI["data"].drop("Adj Close",1).set_index("Date")

The previous code will, after extracting, remove a column that won’t be used («Adj Close») and creating an index using the «Date» column. The date is not a variable we may use for forecasting, so there is no need to keep it as a column of the dataset.

The data now has the typical structure of the financial data: Date, Open, High, Low and Close. The first three rows are shown in the next table:

Date Open High Low Close Volume
2013-09-30 2.75 13.08 12.5 12.5 352800
2013-10-01 12.76 13.16 12.75 12.92 1477900
2013-10-02 13.02 13.08 12.87 12.9 1631900

Predictors

We are going to omit High, Low and Open, using only Open and Volume for the study. Let’s start preparing the data for the analysis. The predictors (X variables) to be used to predict the target magnitued (y variable) will be the following ones:

  • Two day simple moving average (SMA2). The formula is (Ct – Ct-1)/2, being Ct equal to current day’s open price and Ct-1 to previous day’s open price. This formula is applied to each row of the data set.
Predictors = pd.DataFrame({"sma2":CNHI["data"].Open.rolling(window=2).mean()})
  • 1 day window SMA2. The previous day’s SMA2 value.
Predictors["sma2_1"] = Predictors.sma2.shift(1)

And the other predictors will be:

  • Current day SMA2 increment. (SMA2t – SMA2t-1).
  • 1 day window SMA2 increment. (SMA2t-1 – SMA2t-2).
  • Current day volume increment. (Volt – Volt-1).
  • Current day volume rate of change. (Volt – Volt-1)/Volt
  • 1 day window open price. (Ct-1)
  • Current day open price increment. Ct – Ct-1
  • Current day open price. Ct.
Predictors["sma2_increment"] = Predictors.sma2.diff()  
 
Predictors["sma2_1_increment"] = Predictors.sma2_1.diff()  
 
Predictors["vol_increment"] = CNHI["data"].Volume.diff()
 
Predictors["vol_rel_increment"] = CNHI["data"].Volume.diff() / CNHI["data"].Volume
 
Predictors["open_1"] = CNHI["data"].Open.shift(1)
 
Predictors["open_incr"] = CNHI["data"].Open - CNHI["data"].Open.shift(1)
 
Predictors["open"] = CNHI["data"].Open
 
# The rows with nulls generated by rolling values will be removed.
Predictors = Predictors.dropna()

A sample of the first 5 rows:

Date sma2 sma2_1 sma2_increment sma2_1_increment vol_increment vol_rel_increment open_1 open_incr open
2013-10-03 12.895 12.89 0.005 0.135 -495500 -0.436026047 13.02 -0.25 12.77
2013-10-04 12.765 12.895 -0.13 0.005 -21800 -0.019558586 12.77 -0.01 12.76
2013-10-07 12.59 12.765 -0.175 -0.13 -400 -0.000359002 12.76 -0.34 12.42
2013-10-08 12.42 12.59 -0.17 -0.175 104600 0.08582212 12.42 0 12.42
2013-10-09 12.5 12.42 0.08 -0.17 -232400 -0.235604217 12.42 0.16 12.58

 

Target Variable

This will be a classification variable, if the average price will go either up or down the next day.  The target will be forecasting the difference between today’s price and tomorrow’s price (which is unkonwn).

target = pd.DataFrame({"value":Predictors.sma2.shift(-1) - Predictors.sma2}).dropna()

After calculating the data to predict, the three first rows look like this:

Date value
2013-10-03 -0.13
2013-10-04 -0.175
2013-10-07 -0.17

Finally we will match predictors and target values by date and remove those rows without counterpart in the other table.

X = pd.merge(Predictors, target,left_index=True,right_index=True)[Predictors.columns]
y = pd.merge(Predictors, target,left_index=True,right_index=True)[target.columns]

X now contains the predictors and y the target values. The table contains 1,059 records at this moment.

Extreme Gradient Boosting prediction

The extreme gradient boosting is an exceptional machine learning technique for many reasons. It is based on decision trees and it has nice features such as residuals analysis, non-linear regression, feature selection tools, overfitting avoidance and many other more. Other machine learning alternative techniques commonly used for this type of analysis are Support Vector Machines, Neural Networks and Random Forest. I have used all of those for predicting market prices and the Extreme Gradient Boosting is always my first choice.

We will setup the regression model using the 65% of the data and with that model, the next 35% of the data will be used to predict future values. This simulates the actual scenario in which we have past data to train our model and we want to predict how a future datum will be with the data we currently have on hand. The data will be split in two sets: the training set to preconfigure the model and the testing set that won’t be used to build the model, but only to test if it works as expected with new data.

train_samples = int(X.shape[0] * 0.65)
 
X_train = X.iloc[:train_samples]
X_test = X.iloc[train_samples:]
 
y_train = y.iloc[:train_samples]
y_test = y.iloc[train_samples:]

After applying the data splitting, the test data set contains:

  • Train records: 688.
  • Test records: 371.

The target variables will be transformed for binary classification. A positive change in the value of prices will be classified as 1 and a non-positive change as 0.

def getBinary(val):
    if val>0:
        return 1
    else:
        return 0
 
# and the transformation is applied on the test data for later use.
# The train data will be transformed while it is being fit.
y_test_binary = pd.DataFrame(y_test["value"].apply(getBinary)

And next, the model is trained and the test data predicted to verify the accuracy of the system:

regressor = xgb.XGBRegressor(gamma=0.0,n_estimators=150,base_score=0.7,colsample_bytree=1,learning_rate=0.01)
 
xgbModel = regressor.fit(X_train,y_train.value.apply(getBinary))
 
y_predicted = xgbModel.predict(X_test)
y_predicted_binary = [1 if yp >=0.5 else 0 for yp in y_predicted]
 
print (accuracy_score(y_test_binary,y_predicted_binary))
 
 
Out: 0.76010781671159033

So, the initial accuracy without optimizing the model is 76% predicting the daily average price change for each of the the next 371 trading days.

The model can be optimized, I have just used a few parameters to avoid overfitting with the training data and adjusting the learning rate.

The features used should also be analyzed to avoid using redundant variables and to discard those with no correlation. New features should be added to try improved approaches and, to sum up, there is a lot of work that could be done around this basic model.

XGBOOST has also ways to study features. Let’s take a look at their importance:

fig = plt.figure(figsize=(8,8))
plt.xticks(rotation='vertical')
plt.bar([i for i in range(len(xgbModel.feature_importances_))], xgbModel.feature_importances_.tolist(), tick_label=X_test.columns, color="chocolate")
plt.show()

It is obvious that the field extension is huge and especially interesting.