Detección de cambios direccionales en Python

El algoritmo

Esta semana mi mujer compartió de forma humorística un tweet en el que decía que de buena mañana su marido le hablaba de cambios direccionales, series temporales y fractales. Nos pudimos reír con el tema y fue la comidilla entre algunos amigos unos días. El caso es que estuve esta semana pasada programando algunos algoritmos sobre el tema. Para entender la envergadura del problema, valga una breve lista de algunos estudios sobre esta materia pues no es un asunto trivial.

  • Detección y análisis de CD (cambio direccional) de saltos en datos intra-día. Hvozdyk, Lyudmyla (PhD).
  • Identificación de CD basado en eventos. Alkhamees, Nora. IADS.
  • Negociación de Hombro-Cabeza-Hombro basado en CD. Li, Shengnan. CCFEA.
  • Hechos estilizados (regularidades empíricas) en CD, negociación algorítmica. Golub, Anton.
  • Negociación algorítmica. Kampouridis, Michael (PhD). Univ. de Kent.
  • Análisis de rebasamiento. Sun, Jianyong (PhD). Universidad de Essex.
  • Caracterización de CD, datos de alta frecuenca. Serguieva, Antoaneta (PhD). Computación financiera y Analítica, UCL.
  • Caracterización del CD. Gao, Jing. Universidad de Beihang.
  • Carecterización de CD: glosario y comparación. Tao, Ran (PhD). CCFEA.
  • Negociación algorítmica (proyecto de doctorado). Ye, Alan. Universidad de Greenwhich.

Y claro, uno se pregunta qué hace tanto doctor y doctorando escribiendo sobre el asunto. Y al final se tira una semana estudiando el problema y buscando la solución más sencilla. Es lo que tiene dedicarse a la ingeniería del software: cuando más fácil y más rápido, mejor.

Así que he escrito una pequeña variación del algoritmo de Bill Williams para calcular fractales. Sus aplicaciones en trading son innumerables. Desde cálculo de canales, soportes y resistencias, trading armónico, categorización de sesiones del mercado bursátil, determinación de momento y tendencia del valor de instrumentos financieros, análisis de patrones técnicos y todo lo que a quien lo explote se le ocurra.

import numpy as np
import pandas as pd
 
class Fractals(object):
    __data = None
    __peaks = None
    __valleys = None
    __column_mode = None
 
    def __init__(self):
        pass
 
    def getFractals(self, data, column_mode="high", depth=3):
        Bars = data.shape[0]
        peaksBuffer = np.empty(Bars)
        valleysBuffer = np.empty(Bars)
        peaksBuffer [ : ] = np.NaN
        valleysBuffer [ : ] = np.NaN
 
        i = depth
 
        while (i < Bars - depth - 1):
 
            is_upper_fractal = False
            is_lower_fractal = False
 
            lower_range_pos = i - depth
            upper_range_pos = i + depth + 1
            N = lower_range_pos + depth
 
            lower_range_values = data.iloc[lower_range_pos:N][column_mode].values
            upper_range_values = data.iloc[N + 1:upper_range_pos][column_mode].values
            N_value = data.iloc[N][column_mode]
 
            # Basic Fractal:
            # Peaks
            if np.append([N_value], lower_range_values).argmax() == 0 and\
                    np.append([N_value],upper_range_values).argmax() == 0:
 
                if N_value not in lower_range_values:
                    is_upper_fractal = True
                    peaksBuffer[N] = N_value
 
            # Valleys
            if not is_upper_fractal:
                if np.append([N_value], lower_range_values).argmin() == 0 and \
                        np.append([N_value], upper_range_values).argmin() == 0:
 
                    if N_value not in lower_range_values:
                        is_lower_fractal = True
                        valleysBuffer[N] = N_value
            i += 1
 
            peaksBufferSeries = pd.Series(peaksBuffer, name="peaks", index=data.index).dropna()
            valleysBufferSeries = pd.Series(valleysBuffer, name="valleys", index=data.index).dropna()
 
            self.__data = data
            self.__peaks = peaksBufferSeries
            self.__valleys = valleysBufferSeries
            self.__column_mode = column_mode
 
        return pd.merge(
            peaksBufferSeries, valleysBufferSeries, left_index=True, right_index=True, how="outer", sort=False
        )

¿Cómo se utiliza esto? Hace falta pasarle un DataFrame de pandas cuyo índice sea de tipo datetime y que esté ordenado ascendente.

Los datos

Supongamos que tenemos el siguiente DataTrame:

date open high low close
2003-05-28 08:40:00 1.1782 1.17968 1.17619 1.1795
2003-05-28 08:45:00 1.17967 1.18012 1.17967 1.18004
2003-05-28 08:50:00 1.17996 1.18007 1.17939 1.17939
2003-05-28 08:55:00 1.17932 1.17944 1.17691 1.17695
2003-05-28 09:00:00 1.17702 1.17796 1.17702 1.1779
2003-05-28 09:05:00 1.17795 1.17823 1.17759 1.17759
2003-05-28 09:10:00 1.17768 1.17805 1.17756 1.17802
2003-05-28 09:15:00 1.17791 1.17802 1.1778 1.17787
2003-05-28 09:20:00 1.17796 1.17923 1.17773 1.17923
2003-05-28 09:25:00 1.17933 1.17935 1.17786 1.17789
2003-05-28 09:30:00 1.17793 1.17832 1.17762 1.17793
2003-05-28 09:35:00 1.17794 1.17839 1.17789 1.17802
2003-05-28 09:40:00 1.17808 1.17837 1.177 1.17703
2003-05-28 09:45:00 1.17673 1.17827 1.17666 1.17827
2003-05-28 09:50:00 1.17819 1.1783 1.17779 1.17788
2003-05-28 09:55:00 1.17784 1.17872 1.17784 1.17872
2003-05-28 10:00:00 1.17847 1.17867 1.17833 1.17844
2003-05-28 10:05:00 1.17863 1.17886 1.17852 1.17862
2003-05-28 10:10:00 1.17865 1.17872 1.17771 1.17772
2003-05-28 10:15:00 1.1775 1.1787 1.17744 1.17839
2003-05-28 10:20:00 1.1784 1.17883 1.17837 1.17859
2003-05-28 10:25:00 1.17859 1.17869 1.17842 1.17855
2003-05-28 10:30:00 1.17859 1.1787 1.17839 1.17851
2003-05-28 10:35:00 1.17845 1.17874 1.17831 1.17867
2003-05-28 10:40:00 1.17866 1.17977 1.17818 1.17861
2003-05-28 10:45:00 1.17852 1.17866 1.17822 1.17856
2003-05-28 10:50:00 1.1787 1.17893 1.1786 1.1786
2003-05-28 10:55:00 1.17855 1.17855 1.17786 1.17786
2003-05-28 11:00:00 1.17769 1.17783 1.17426 1.17429
2003-05-28 11:05:00 1.17429 1.17474 1.17411 1.17456
2003-05-28 11:10:00 1.17467 1.17516 1.17462 1.17484
2003-05-28 11:15:00 1.17472 1.17517 1.17217 1.17517
2003-05-28 11:20:00 1.17518 1.17537 1.17493 1.17493
2003-05-28 11:25:00 1.17493 1.17532 1.17469 1.17511
2003-05-28 11:30:00 1.17504 1.17504 1.17436 1.17469
2003-05-28 11:35:00 1.17471 1.17563 1.17471 1.17558
2003-05-28 11:40:00 1.17559 1.17793 1.17528 1.17529
2003-05-28 11:45:00 1.17519 1.17559 1.17506 1.17547
2003-05-28 11:50:00 1.17558 1.17572 1.17533 1.17533
2003-05-28 11:55:00 1.17536 1.17536 1.17307 1.17307
2003-05-28 12:00:00 1.17268 1.17268 1.17166 1.17167
2003-05-28 12:05:00 1.17168 1.17237 1.17147 1.17224
2003-05-28 12:10:00 1.17214 1.17228 1.17194 1.17217
2003-05-28 12:15:00 1.17224 1.17237 1.17187 1.17209
2003-05-28 12:20:00 1.17205 1.17229 1.17153 1.17153
2003-05-28 12:25:00 1.17121 1.17226 1.17047 1.17204
2003-05-28 12:30:00 1.17208 1.1726 1.17181 1.17259
2003-05-28 12:35:00 1.17261 1.17264 1.17182 1.17192
2003-05-28 12:40:00 1.17203 1.17337 1.17189 1.17337
2003-05-28 12:45:00 1.17351 1.1736 1.1714 1.17153

Si el campo «date» es una columna y el dataframe se llama «data», por ejemplo, tendríamos que escribir lo siguiente para poder usarlo como argumento, pues el  índice ha de ser datetime:

data = data.set_index("date")

Por supuesto el código de la clase anterior se puede modificar para funcionar con numpy o cualquier otra biblioteca.

La aplicación

Si queremos obtener los máximos y mínimos locales de los valores «high» de nuestra serie de datos utilizando 6 valores para calcular los picos y valles, es tan simple como hacer esto:

#"data" es un DataFrame que contiene los datos de la tabla de arriba.
fr = fractals()
high_fractals = fr.getFractals(data=data,column_mode="high", depth=3)

El resultado se cargaría en la variable high_fractals y sería el siguiente:

Y para extraer los máximos, por ejemplo, solo bastaría ejecutar el siguiente código:

Con una profundidad de 2, éste sería el resultado para los datos:

import matplotlib.pyplot as plt
 
fr = fractals() 
#"data" es un DataFrame que contiene los datos de la tabla de arriba. fr = fractals() 
high_fractals = fr.getFractals(data=data,column_mode="high", depth=2)
 
fig, ax = plt.subplots()
fig.set_size_inches(10, 5)
plt.plot(high_fractals["valleys"].dropna(), marker='^', linestyle="none", markersize=7,  color="red")
plt.plot(high_fractals["peaks"].dropna(), marker='v', linestyle="none", markersize=7,  color="green")
plt.plot(data.high, color="black")
plt.show()

El resultado es más que satisfactorio para la mayoría de escenarios.  En mi caso, lo voy a mejorar añadiendo también identificación de movimientos a partir de cierto nivel (porcentaje, pips, etc). Pero, en general, este sencillo algoritmo cumple con todas las expectativas que personalmente necesito para marcar cambios direccionales en series temporales. Menos es más.

Si en vez de mostrar la serie sobre la que hemos obtenido los picos y valles, empleamos toda la serie de precios, el resultado será mucho más obvio. Para el siguiente ejemplo, vamos a utilizar un total de 200 puntos, en vez de los 50 de antes.

#cargamos un dataframe con 200 datos.
data = all_historical_data.dropna()[5000:5200]
 
fr = fractals()
 
high_fractals = fr.getFractals(data=data,column_mode="high", depth=3)
high_fractals["fractal_type"]="highs"
high_fractals=high_fractals.reset_index().set_index(["fractal_type","date"])
 
low_fractals = fr.getFractals(data=data,column_mode="low", depth=3)
low_fractals["fractal_type"]="lows"
low_fractals=low_fractals.reset_index().set_index(["fractal_type","date"])
 
# Vamos a incluir una serie de librerías que nos interesan.
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import matplotlib.dates as mdates
from mpl_finance import candlestick_ohlc
from datetime import date
import datetime
 
fig, ax1 = plt.subplots(figsize=(20,5))
ax1.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S"))
 
#Creamos un conjunto de velas japonesas.
number_of_ticks = data.shape[0]
dates = data.index
x=0
ohlc=[]
while x < len(dates):
    d = mdates.date2num(dates[x])
    append_me = d, data["open"].values[x], data["high"].values[x], data["low"].values[x], data["close"].values[x]
    ohlc.append(append_me)
    x += 1
 
# Añadimos bordes a las velas.
lines, patches = candlestick_ohlc(ax1, ohlc, width=0.2*2.5/number_of_ticks, colorup="green", colordown="red")
for line, patch in zip(lines, patches):
    patch.set_edgecolor("black")
    patch.set_linewidth(0.9)
    patch.set_antialiased(False)
    line.set_color("black")
    line.set_zorder(0)
 
#Seleccionamos los máximos de los Highs (Higher Highs) y los mínimos de los lows (Lower Lows)
hfs = high_fractals.loc["highs"].drop("valleys",axis=1).dropna()
lfs = low_fractals.loc["lows"].drop("peaks",axis=1).dropna()
 
plt.plot(hfs, marker="o", linestyle="None")
plt.plot(lfs, marker="o", linestyle="None")
plt.savefig("fractals.png")
plt.show()

Y el resultado es la gráfica siguiente:

¿Y qué se puede hacer con estos puntos? Con dos puntos podemos pintar una línea, por ejemplo.

data = high_fractals["peaks"].dropna()
TF_seconds = 300 #timeframe de 5 minutos.
peaks = []
 
for i in range(1, data.shape[0]):
    peaks.append( { "type": "H", 
    "x1": data[i - 1:i].index, "y1": data[i - 1], 
    "x2": data[i:i + 1].index, "y2": data[i], 
    )
 
for point in peaks:
    diference = (point["x2"] - point['x1']).seconds[0]
    units = diference / TF_seconds
    slope = (point["y2"] - point["y1"]) / units
    slope = slope / 0.0001
    peaks[i]["slope"] = slope
    peaks[i]["units"] = units
    i += 1

Y he ahí todos los vectores directores de todas las líneas de todos los picos consecutivos. Con meta-información relativa a su pendiente e incremento de X en formato escalar (aparte de datetime). La imagen de abajo ilustra la estructura de la implementación completa, donde la pendiente está calculada en pips:

La imaginación es el límite. Podríamos utilizar estos datos para analizar varias características de nuestros vectores directores. Veamos un ejemplo con la implementación completa. Para ello vamos a unir en un solo dataframe todos los fractales y todos los vectores directores en segundo lugar:

all_analyzed_data=pd.concat([high_fractals,low_fractals])

Lo que nos generaría una estructura similar a ésta:

Con los datos de los fractales, vamos a generar los vectores directores entre cada punto y todos los puntos posteriores al mismo.

Digits=4
# Low peaks -> the lower high points
# Low valleys -> the lower low points
# High peaks -> the Higher high points
# High valleys -> THe Higher high points
 
tl_higher_highs = Trendlines(dataframe=all_analyzed_data.loc["highs"],column_mode="peaks",seconds=300, digits=Digits)
tl_higher_lows = Trendlines(dataframe=all_analyzed_data.loc["highs"],column_mode="valleys",seconds=300, digits=Digits)
 
tl_lower_lows = Trendlines(dataframe=all_analyzed_data.loc["lows"],column_mode="peaks",seconds=300, digits=Digits)
tl_lower_highs = Trendlines(dataframe=all_analyzed_data.loc["lows"],column_mode="valleys",seconds=300, digits=Digits)

Cada una de las cuatro variables, tendría una estructura similar a ésta:

Cada fila de la tabla anterior representa un vector director que une dos puntos de los identificados con el algoritmo de fractales del principio. Como estoy analizando los Higher Highs, me voy a centrar en los picos. Los datos del dataframe incluyen:

  • Tipo de vector: valle o pico.
  • x1: fecha del primer punto.
  • y1: precio del primer punto.
  • x2, y2: fecha y precio del segundo punto.
  • height: Altura entre y1 e y2.
  • pips: Altura entre y1 e y2 en pips.
  • slope: pendiente del vector en pips por unidades temporales (pips/5m en este caso).
  • width: Distancia horizontal en número de unidades temporales entre el punto 1 y el punto 2 (cada unidad temporal son 5 minutos en este caso).

Quitamos información innecesaria y preparamos el dataset para su análisis.

data_hhs = tl_higher_highs.getFractalsVectors()
data_hhs_peaks=data_hhs.reset_index().drop(["x1","x2","index_type"],axis=1)

Con lo que ahora tenemos el siguiente DataFrame:

Veamos algunas características. Como la distribución de número de pips de cada vector director.

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
from scipy import stats
 
sns.distplot(data_hhs_peaks["pips"].values)
plt.show()

Comparar la altura en pips de los puntos con la pendiente de la recta que los une.

df = data_hhs_peaks[["pips","slope"]]
#df = df[df.slope.apply(abs)<=1.5]
df.columns = ["x","y"]
 
g = sns.jointplot(x="x", y="y", data=df, kind="kde", color="brown")
g.plot_joint(plt.scatter, c="b", s=30, linewidth=1, marker="+")
g.ax_joint.collections[0].set_alpha(0)
g.set_axis_labels("$Pendiente$", "$Altura$");

Y comprender mejor el entorno sobre el que estamos trabajando. Porque no es sino con la comprensión profunda del problema, con lo que más oportunidades tenemos para encontrar soluciones óptimas.

Entornos de Inteligencia Artificial para Trading y Simulando Ticks

A estas alturas, si has estado intentando diseñar modelos de machine learning  o inteligencia artificial para hacer trading, ya te habrás dado cuenta de que la cosa no es tan fácil como normalizar unos cuantos datos históricos de Forex o Bolsa, meterlos en una red neuronal y esperar que tu output tipo {mantener, comprar, vender} te salga lo suficiente bien balanceado como para que la red negocie por ti.

De hecho, en el pasado, mi experiencia es que arquitecturas como XGBOOST daban mejores predicciones con datos suavizados con medias móviles de 2 o 3 muestras que perceptrones o LSTMs.

Todo depende del tiempo que uno tiene disponible. Si es poco, como en mi caso, lo mejor es ir primero a los más seguro y luego ir refinando el trabajo para poder, con la mayor brevedad posible, aprobar o descartar una metodología. Lo de «la mayor brevedad» es un concepto bastante relativo.

En mi caso, la mayor brevedad han sido unos cinco años de trabajo intenso más un esfuerzo financiero relevante. En mi caso, como ya he comentado en otras entradas, he estudiado casi todas las técnicas de trading, las he probado, validado; una tras otra, durante años. Tanto el trading manual como el trading algorítmico. Es decir, que no solo he estudiado trading o pagado por aprender, sino que además he tenido que investigar mucho sobre algoritmos, estructuras, lenguajes, estadística (incluyendo un master en Ciencia de Datos), probabilidad, machine learning, inteligencia artificial y otras materias relevantes.

En ese sentido, se puede decir que me he hecho una idea genérica relativamente amplia del asunto. Escribo este rollo, no para ponerme medallas, sino para que quien lea este texto entienda que el trading no es llegar y triunfar. En el mejor de los casos hacen falta entre 3 o 5 años para empezar a entenderlo y puede que la tasa de fracaso sea del 95%. Esto es, que de cada 100 personas que intentan hacer trading, solo 5 conseguiran cubrir costes y un porcentaje menor ganar algo. Y no solo depende de lo buena que sea la técnica utilizada, el 80% de los factores van a ser ajenos al dominio de una técnica de trading determinada. De hecho, no me pongo medallas porque no enseño mis técnicas gratuitamente nunca. Tengo por ahí una mención a un seminario de 5 días de trading por 12000 euros por si alguien tiene curiosidad por saber qué principios y técnicas utilizo.

Lo que voy a mostrar ahora es una implementación personalizada de un proveedor de datos virtual; que es más sencillo de programar que un canal de comunicación con mi broker, de forma que puedo hacer simulaciones y estudios exactamente igual que si estuviera conectado, pero con mucho menos esfuerzo. Con bastante esfuerzo, pero con mucho menos.

A estas alturas ya funciona razonablemente bien, es un borrador del final, pero ya tiene suficiente complejidad para que valga la pena como ejemplo. Me he programado un broker virtual también, porque me gusta la modularidad. Este broker puede recibir datos de distintas fuentes. Para ello, uso una interfazclase abstracta (depende del lenguaje) que define unos métodos y propiedades iguales, sea cual sea la fuente de datos.

Una de las fuentes de datos que utilizo son ficheros csv en formato tick (date, ask, bid). Estos los he convertido en OHLC para Ask y OHLC para bid en marcos temporales de 1-minuto y 5-minutos. Para mí es suficiente, pero si alguien quiere más granularidad y exactitud, puede usar directamente los datos ask-bid, o hacer un muestreo a 30-segundos, 10-segundos, etc. para los datos tick. Para que se entienda mejor, tengo:

  1. Un único csv con todos los datos OHLC en el marco temporal de 5-minutos entre 2003 y 2019. Este fichero se utiliza para dibujar la gráfica OHLC, entrenar un modelo estadístico o lo que sea.
  2. Un fichero por año de datos OHLC en el marco temporal de 1-minuto. Este fichero se utiliza para extraer los ticks que tendrán lugar hasta que la siguiente barra OHLC se tenga que dibujar. Son como los datos que nos van llegando del broker en tiempo real y muestran cómo va cambiando el precio del instrumento.

Como observación, estuve decidiendo si utilizar datos tick ASK/BID, pero me pareció que tanta precisión no era necesaria y que iba a utilizar un generador de ruído para calcular el spread mediante un generador de números aleatorios con una distribución T de student o bien promediando High-Low de mi muestra histórica. Pero eso es otra historia. Recordemos que compramos a precio Ask, pero vendemos a precio Bid. La comisión por operación también tiene que tenerse en cuenta, por ejemplo.

Vamos a plantear el sistema:

Datos históricos en 5 minutos
(última fila, núm. filas=n)
(AMD = año-mes-día)

Datos en «tiempo real» en 1 minuto
(todos los ticks desde que cierra la vela histórica hasta que abre la siguiente).
Date Open High Low Close
AMD 08:00:00 On Hn Ln Cn

Tengamos en cuenta que la vela de 8:00 en el TF de 5m contiene todos los precios entre las 8:00 y las 8H 4' 59''.

Por lo que las señales en tiempo real tienen que empezar a las 8H 5' 00''.

Date Open High Low Close
AMD 08:05:00  O1 H1  L1  C
AMD 08:06:00 O2 H2  L2  C
AMD 08:07:00 O3 H3  L3  C
AMD 08:08:00 O4 H4  L4  C
08:09:00 O5 H5  L5  C

Empecemos con el código:

# Como está orientado a csv, entonces no voy a incluir información sobre conexiones mediante sockets, puertos, ejecutables, etc...
# Definimos una clase abstracta que sirva de interfaz para el resto que vengan.
 
class Provider_abstract():
 
    def __init__(self):
        raise NotImplementedError
 
    def getRTPricesNext(self):
        raise NotImplementedError
 
    def getHistPricesNext(self):
        raise NotImplementedError
 
    def setTickDataPath(self):
        raise NotImplementedError
 
    def setHistoricalFile(self):
        raise NotImplementedError

Los métodos tendrán las siguientes funciones:

  • getRTPricesNext: Devuelve el dataframe con los ticks.
  • getHistPricesNext: Devuelve un histórico de precios.
  • setTickDataPath: El directorio donde están los datos tick.
  • setHistoricalFile: La ruta al fichero con los datos históricos.

Y ahora procedemos a la implementación, veamos el código completo primero.

from gym_gspfx.gspProviders.provider import Provider_abstract
import pandas as pd
 
# Reads TF data and generates tick data from the same or other data source.
# For instance, reads 5m data nd generates ticks from 1m data.
 
class Provider(Provider_abstract):
 
    def __init__(self, historical_file, tick_data_path, start_row=0, number_of_samples=300, \
                 tick_file_prefix="EURUSD_1M_"):
 
        self.min_year=None
        self.max_year=None
        self.__historical_file = ""
        self.__tick_data_path = ""
        self.setHistoricalFile(historical_file)
        self.setTickDataPath(tick_data_path)
        self.current_start_row_index = None
        self.number_of_samples = number_of_samples
        self.next_future_start_row = start_row
        self.last_closed_row = None
        self.next_future_row_to_deliver_index = 0
        self.next_two_future_rows_to_deliver = None
        self.__tick_data_files = {"EMPTY": None}
        self.tick_file_prefix=tick_file_prefix
 
 
    def setHistoricalFile(self,fileName):
        self.__historical_file = pd.read_csv(fileName, parse_dates=["date"]).set_index("date")
 
    def setTickDataPath(self, tick_data_path):
        self.__tick_data_path=tick_data_path
 
 
    def getHistPricesNext(self):
        self.current_start_row_index = self.next_future_row_to_deliver_index
        self.next_future_row_to_deliver_index += 1
 
        __df = self.__historical_file.iloc[self.current_start_row_index:self.current_start_row_index + self.number_of_samples]
 
        self.last_closed_row = __df.tail(1)
 
        self.next_two_future_rows_to_deliver = pd.DataFrame( \
            self.__historical_file.iloc[self.current_start_row_index + self.number_of_samples:\
                                        self.next_future_row_to_deliver_index + self.number_of_samples + 1])
        return __df.copy()
 
 
    def getRTPricesNext(self):
 
        if self.last_closed_row is None:
            return None
 
        if "EMPTY" in self.__tick_data_files.keys() and self.__historical_file.shape[0]>0:
            del self.__tick_data_files["EMPTY"]
 
        self.min_year = self.last_closed_row.index.year.values[0]
        self.max_year = self.next_two_future_rows_to_deliver.head(1).index.year.values[0]
 
        if self.max_year not in self.__tick_data_files.keys():
            self.__tick_data_files[self.max_year] = \
                pd.read_csv(self.__tick_data_path + "EURUSD_1M_" + str(self.max_year) + ".csv", parse_dates=["date"])
 
            if self.min_year in self.__tick_data_files.keys() and self.min_year != self.max_year:
                del self.__tick_data_files[self.min_year]
 
        initial_date = self.next_two_future_rows_to_deliver.head(1).index.values[0]
        end_date = self.next_two_future_rows_to_deliver.tail(1).index.values[0]
 
        __df = self.__tick_data_files[self.max_year]
        __df = __df[(__df["date"] >= initial_date) & (__df["date"] < end_date)]
 
        return __df.copy()

El método __init__ se encarga de parametrizar los valores iniciales de la clase, como la fila del histórico desde el que se comenzará a extraer datos, el número de filas del histórico que se extraerán. También se han de definir al instanciar la clase la ruta completa al fichero de históricos y la carpeta donde están los ficheros de ticks.

Los ficheros de ticks los guardo en un diccionario conforme los necesito por motivos prácticos. He optado por usar ficheros de 1m, pero podría tener otra estructura y he preferido prever la posibilidad de tener que utilizar ficheros de tick múltiples. Todo ello se define en el constructor, junto con atributos que serán utilizados por la clase.

 def __init__(self, historical_file, tick_data_path, start_row=0, number_of_samples=300, \
                 tick_file_prefix="EURUSD_1M_"):
 
        self.min_year=None
        self.max_year=None
        self.__historical_file = ""
        self.__tick_data_path = ""
        self.setHistoricalFile(historical_file)
        self.setTickDataPath(tick_data_path)
        self.current_start_row_index = None
        self.number_of_samples = number_of_samples
        self.next_future_start_row = start_row
        self.last_closed_row = None
        self.next_future_row_to_deliver_index = 0
        self.next_two_future_rows_to_deliver = None
        self.__tick_data_files = {"EMPTY": None}
        self.tick_file_prefix=tick_file_prefix

Tras ello, los métodos que cargan el fichero de históricos y configuran la ruta de los ficheros de ticks.
Nótese que utilizo la librería de pandas y que asumo que los datos vienen ya ordenados por fecha, ascendente.

    def setHistoricalFile(self,fileName):
        self.__historical_file = pd.read_csv(fileName, parse_dates=["date"]).set_index("date")
 
   def setTickDataPath(self, tick_data_path):
       self.__tick_data_path=tick_data_path

Cada vez que queremos avanzar 1 vela, tenemos que llamara a este procedimiento.
El procedimiento devuelve un DataFrame con el número de filas indicado en el constructor o modificado en el atributo number_of_samples.
Cada vez que se le llama, avanza 1 fila.

    def getHistPricesNext(self):
        self.current_start_row_index = self.next_future_row_to_deliver_index
        self.next_future_row_to_deliver_index += 1
 
        __df = self.__historical_file.iloc[self.current_start_row_index:self.current_start_row_index + self.number_of_samples]
 
        self.last_closed_row = __df.tail(1)
 
        self.next_two_future_rows_to_deliver = pd.DataFrame( \
            self.__historical_file.iloc[self.current_start_row_index + self.number_of_samples:\
                                        self.next_future_row_to_deliver_index + self.number_of_samples + 1])
        return __df.copy()

Por útlimo, tenemos el método que nos devuelve todos los ticks que se producirán desde el cierre de la última vela de los históricos hasta justo antes de la apertura del siguiente.

   def getRTPricesNext(self):
 
        if self.last_closed_row is None:
            return None
 
        if "EMPTY" in self.__tick_data_files.keys() and self.__historical_file.shape[0]>0:
            del self.__tick_data_files["EMPTY"]
 
        self.min_year = self.last_closed_row.index.year.values[0]
        self.max_year = self.next_two_future_rows_to_deliver.head(1).index.year.values[0]
 
        if self.max_year not in self.__tick_data_files.keys():
            self.__tick_data_files[self.max_year] = \
                pd.read_csv(self.__tick_data_path + "EURUSD_1M_" + str(self.max_year) + ".csv", parse_dates=["date"])
 
            if self.min_year in self.__tick_data_files.keys() and self.min_year != self.max_year:
                del self.__tick_data_files[self.min_year]
 
        initial_date = self.next_two_future_rows_to_deliver.head(1).index.values[0]
        end_date = self.next_two_future_rows_to_deliver.tail(1).index.values[0]
 
        __df = self.__tick_data_files[self.max_year]
        __df = __df[(__df["date"] >= initial_date) & (__df["date"] < end_date)]
 
        return __df.copy()

Estudiando el código anterior se puede hacer una idea de cómo realizar un proveedor de ticks y datos financieros personalizado, así como adaptarlo a las necesidades propias. Una aplicación puede ser un simulador de trading por ejemplo. Yo lo uso para mis proyectos de inteligencia artificial, de manera que no necesito estar conectado a ningún proveedor de precios online para simular series temporales y mercados financieros.

En la siguiente imagen se muestra cómo usar la clase creada con un ejemplo real.

Generador de datos históricos y tick en Python

Y si volvemos a ejecutar los métodos, obtenemos la vela siguiente con sus históricos y los ticks correspondientes.

Si alguien no conoce la librería Pandas y no entiende alguna de las expresiones del código, se pueden consultar en la documentación oficial, en el siguiente enlace: http://pandas.pydata.org/pandas-docs/stable/

Y preguntados:

  • ¿Cómo puede obtenerse algo negociando si no es desarrollando una técnica para acertar las suficientes veces como para cubrir los costes de los fallos?
  • ¿Cómo puede mantenerse un negocio sin conocer el riesgo asociado y una buena gestión financiera y de riesgos conforme?
  • ¿cómo podría un cerebro humano ser mejor calculando aspectos técnicos y numéricos que una compleja computadora?