Accidentally Discovered an Interesting Pine Strategy
A few days ago, while browsing strategies on the Inventor forum, I came across one called "Panel Pro+ Quantum SmartPrompt". After reviewing the code, I found this strategy's approach quite interesting: it uses 10 technical indicators, assigns different weights to each indicator based on market conditions, and finally calculates a score to determine buy/sell decisions. For example, in a bull market state, trend indicators have a weight of 2.0 and RSI has a weight of 1.5; in a bear market state, the weights are different. It feels like it's mimicking human thinking: focusing on different aspects under different circumstances.
Upon closer examination, this structure is quite similar to a neural network:
- 10 technical indicators as inputs
- Market state classification acts like a hidden layer
- Weight matrices serve as connection weights
- Finally outputs a score However, the problem is that all weights are hardcoded, for example:
pine
if marketType == "Bull"
array.set(weights, 0, 2.0) // Trend weight is fixed at 2.0
array.set(weights, 1, 1.5) // RSI weight is fixed at 1.5
These numbers are completely fixed by the author based on market experience, without any learning or optimization.
Idea: Make the Weights Learnable
Since the structure already resembles a neural network, why not make it truly capable of learning?
My idea is simple:
- 1.Keep the original weight calculation method to get a "weight score"
- 2.Use this weight score as input to train a small neural network
- 3.Let the network learn to predict future returns from the weight score
- 4.Decide whether to open positions based on the predicted return magnitude This way, we preserve the original strategy's logic while adding learning capability.
Implementation on the Inventor Platform
I chose the Inventor platform mainly because it supports Python and contains rich data.
Step 1: Rewrite Technical Indicators
I rewrote all indicators from the Pine script in Python, using the talib library to ensure calculation accuracy. This includes common indicators like EMA, MACD, RSI, ATR, as well as volume analysis and simple candlestick pattern recognition.
Step 2: Market State Detection
Following the original strategy's logic, I determine market types (Bull, Bear, Eagle, Wolf, etc.) based on combinations of various indicators. This part is basically a combination of if-else logic.
Step 3: Weight Score Calculation
This is the core part. I set up two sets of weights:
- Base weights: [2.0, 1.5, 2.0, 1.3, 1.2, ...]
- Market weights: adjusted according to different market states Final weight = Base weight ร Market weight
Then I use these weights to calculate a weighted sum of the 10 indicators' raw scores to get the "weight score."
Step 4: Neural Network Predictor
I built a simple network:
- Input: 1 feature (weight score)
- Hidden layer: 16 neurons with ReLU activation
- Output: predicted return, constrained to ยฑ5% using tanh Training objective: Use the weight score at time t-1 to predict the price change at time t.
Step 5: Trading Logic
Instead of buying/selling directly based on score levels, I now look at predicted returns:
- Predicted return > 1.5%: go long or close short and go long
- Predicted return < -1.5%: go short or close long and go short
- Other cases: maintain current position I also keep stop-loss and take-profit mechanisms to ensure controllable risk.
Observations from Actual Operation
Data Collection
The strategy can normally collect training data. Each time there's a new candlestick, it uses the previous candlestick's weight score as a feature and the current candlestick's percentage change relative to the previous one as the label.
The data looks roughly like this:
Weight Score = 15.6, Return = +0.8%
Weight Score = -8.2, Return = -1.2%
Weight Score = 22.1, Return = +0.3%
Model Training
The neural network trains normally, with MSE loss gradually decreasing. I set it to retrain every 4 hours to ensure the model can adapt to market changes.
Prediction Performance
The model's predictions do show some correlation with actual returns, but it's not particularly strong. Main issues:
- 1.Single feature is too simple, possibly insufficient information
- 2.Short-term price movements have high randomness
- 3.Futures markets have considerable noise Trading Performance Thanks to stop-loss and take-profit protection, risk control for individual trades is decent. However, overall profitability is mediocre, mainly due to insufficient prediction accuracy.
- Features Too Limited: Using only the weight score as a single feature is indeed too simple. Markets are so complex that one number can hardly capture everything.
- Unstable Sample Quality: Contract prices have large short-term fluctuations, and often the ups and downs are actually random, making the quality of training samples unstable.
- Overfitting Risk: Although the network is simple, there's still potential for overfitting when sample size is limited.
Real-time Requirements: Online learning needs to balance training time with real-time performance.
Limited Time, Insufficient Optimization
This strategy has many areas for improvement, but with limited time and energy, I couldn't optimize it deeply:Feature Aspects: Could add more technical indicators or use statistical features of price sequences.
Model Aspects: Could try sequence models like LSTM, or use ensemble methods with multiple models.
Data Aspects: Improve sample quality and add data cleaning.
Risk Management: Refine dynamic stop-loss and optimize position management.
Gains and Reflections
This exploration taught me an important lesson: good inspiration is all about timely implementation! When I saw the weight matrix design in the Pine script, I immediately thought of the possibility of improving it with neural networks. If I had just thought about it without taking action, or procrastinated, this idea would likely have been forgotten.
Fortunately, the Inventor platform provided a Python environment and data interfaces, allowing me to quickly turn the idea into runnable code. From generating the idea to completing the basic implementation took only about a day.
Although the final strategy's performance was mediocre, running it in practice at least validated that this approach was feasible. More importantly, new ideas and improvement directions emerged during the implementation process. Without taking action promptly, none of these subsequent discoveries and thoughts would have occurred.
Theoretical discussions can never compare to actually writing code, running data, and examining results. Quantitative trading is like this - there are many ideas, but truly valuable ones are those that get quickly implemented and validated.
python
'''backtest
start: 2025-07-31 00:00:00
end: 2025-08-07 00:00:00
period: 1h
basePeriod: 5m
exchanges: [{"eid":"Futures_Binance","currency":"ETH_USDT","balance":5000000,"fee":[0.01,0.01]}]
'''
import numpy as np
from collections import deque
import talib as TA
# ========== Exception Classes ==========
class Error_noSupport(BaseException):
def __init__(self):
Log("Only futures trading is supported! #FF0000")
class Error_AtBeginHasPosition(BaseException):
def __init__(self):
Log("Futures position exists at startup! #FF0000")
# ========== Return Prediction Neural Network ==========
class ReturnPredictor:
def __init__(self, input_size=10, hidden_size=20, output_size=1):
"""Return prediction network: X[t] -> y[t+1] (return rate)"""
self.W1 = np.random.randn(input_size, hidden_size) * 0.1
self.b1 = np.zeros((1, hidden_size))
self.W2 = np.random.randn(hidden_size, output_size) * 0.1
self.b2 = np.zeros((1, output_size))
self.learning_rate = 0.001
def sigmoid(self, x):
return 1 / (1 + np.exp(-np.clip(x, -250, 250)))
def tanh(self, x):
return np.tanh(x)
def forward(self, X):
self.z1 = np.dot(X, self.W1) + self.b1
self.a1 = self.sigmoid(self.z1)
self.z2 = np.dot(self.a1, self.W2) + self.b2
# Output predicted return rate, using tanh to limit to reasonable range
self.a2 = self.tanh(self.z2) * 0.1 # Limit to ยฑ10% range
return self.a2
def backward(self, X, y, output):
m = X.shape[0]
# MSE loss gradient
dZ2 = (output - y) / m
# tanh derivative
tanh_derivative = 1 - (output / 0.1) ** 2
dZ2 = dZ2 * 0.1 * tanh_derivative
dW2 = np.dot(self.a1.T, dZ2)
db2 = np.sum(dZ2, axis=0, keepdims=True)
dA1 = np.dot(dZ2, self.W2.T)
dZ1 = dA1 * self.a1 * (1 - self.a1) # sigmoid derivative
dW1 = np.dot(X.T, dZ1)
db1 = np.sum(dZ1, axis=0, keepdims=True)
# Update weights
self.W2 -= self.learning_rate * dW2
self.b2 -= self.learning_rate * db2
self.W1 -= self.learning_rate * dW1
self.b1 -= self.learning_rate * db1
def train(self, X, y, epochs=100):
for i in range(epochs):
output = self.forward(X)
self.backward(X, y, output)
if i % 20 == 0:
loss = np.mean((output - y) ** 2)
Log(f"Return prediction training epoch {i}, MSE loss: {loss:.6f}")
def predict(self, X):
return self.forward(X)
# ========== Technical Indicators Calculation Class ==========
class TechnicalIndicators:
@staticmethod
def calculate_indicators(records, use_completed_only=True):
"""Calculate technical indicators and features"""
if len(records) < 55:
return None, None
# Only use completed candlestick data
if use_completed_only and len(records) > 1:
working_records = records[:-1]
else:
working_records = records
if len(working_records) < 55:
return None, None
closes = np.array([r['Close'] for r in working_records])
highs = np.array([r['High'] for r in working_records])
lows = np.array([r['Low'] for r in working_records])
volumes = np.array([r['Volume'] for r in working_records])
opens = np.array([r['Open'] for r in working_records])
try:
# Basic indicators
ema_55 = TA.EMA(closes, timeperiod=55)
sma_vol20 = TA.SMA(volumes, timeperiod=20)
macd, signal_line, _ = TA.MACD(closes)
rsi_val = TA.RSI(closes, timeperiod=14)
atr14 = TA.ATR(highs, lows, closes, timeperiod=14)
range20 = TA.STDDEV(closes, timeperiod=20)
# Calculate derived indicators
sma_atr20 = TA.SMA(atr14, timeperiod=20)
sma_range20 = TA.SMA(range20, timeperiod=20)
rvol = volumes / sma_vol20 if sma_vol20[-1] > 0 else np.ones_like(volumes)
delta = closes - opens
# Calculate volume thresholds
vol_abs_thresh = sma_vol20 * 1.2
sniper_thresh = np.percentile(volumes[-40:], 80) if len(volumes) >= 40 else sma_vol20[-1]
# Trend
trend = np.where(closes > ema_55, 1, np.where(closes < ema_55, -1, 0))
# Simplified candlestick patterns
body_size = np.abs(closes - opens)
total_range = highs - lows
# Hammer pattern
is_hammer = ((total_range > 3 * body_size) &
((closes - lows) / (total_range + 0.001) > 0.6) &
((opens - lows) / (total_range + 0.001) > 0.6))
# Engulfing pattern
is_engulfing = np.zeros_like(closes, dtype=bool)
if len(closes) >= 2:
is_engulfing[1:] = ((closes[1:] > opens[:-1]) &
(opens[1:] < closes[:-1]) &
(closes[1:] > opens[1:]) &
(opens[1:] < closes[1:]))
pattern = np.where(is_hammer, 1, np.where(is_engulfing, 2, 0))
# ๐ฅ Calculate normalized feature vectors (for neural network input)
features = []
# 1. Trend feature
if len(ema_55) > 0 and not np.isnan(ema_55[-1]):
trend_feature = (closes[-1] - ema_55[-1]) / ema_55[-1]
features.append(np.tanh(trend_feature * 100))
else:
features.append(0)
# 2. RSI feature
if len(rsi_val) > 0 and not np.isnan(rsi_val[-1]):
rsi_feature = (rsi_val[-1] - 50) / 50
features.append(rsi_feature)
else:
features.append(0)
# 3. MACD feature
if len(macd) > 0 and not np.isnan(macd[-1]) and not np.isnan(signal_line[-1]):
macd_feature = (macd[-1] - signal_line[-1]) / closes[-1] if closes[-1] > 0 else 0
features.append(np.tanh(macd_feature * 1000))
else:
features.append(0)
# 4. Volume feature
if len(vol_abs_thresh) > 0 and vol_abs_thresh[-1] > 0:
vol_feature = volumes[-1] / vol_abs_thresh[-1] - 1
features.append(np.tanh(vol_feature))
else:
features.append(0)
# 5. Relative volume feature
if len(rvol) > 0 and not np.isnan(rvol[-1]):
rvol_feature = rvol[-1] - 1
features.append(np.tanh(rvol_feature))
else:
features.append(0)
# 6. Delta feature
if len(delta) > 0 and not np.isnan(delta[-1]) and closes[-1] > 0:
delta_feature = delta[-1] / closes[-1]
features.append(np.tanh(delta_feature * 100))
else:
features.append(0)
# 7. ATR feature
if len(atr14) > 0 and len(sma_atr20) > 0 and sma_atr20[-1] > 0:
atr_feature = atr14[-1] / sma_atr20[-1] - 1
features.append(np.tanh(atr_feature))
else:
features.append(0)
# 8. Blocks feature
if len(volumes) >= 10:
highest_vol = np.max(volumes[-10:])
blocks_feature = volumes[-1] / highest_vol - 0.8 if highest_vol > 0 else 0
features.append(np.tanh(blocks_feature * 5))
else:
features.append(0)
# 9. Tick feature
if len(sma_vol20) > 0 and sma_vol20[-1] > 0:
tick_feature = volumes[-1] / sma_vol20[-1] - 1
features.append(np.tanh(tick_feature))
else:
features.append(0)
# 10. Pattern feature
pattern_feature = pattern[-1] / 2.0 if len(pattern) > 0 else 0
features.append(pattern_feature)
# Ensure correct feature count
while len(features) < 10:
features.append(0)
features = np.array(features[:10]).reshape(1, -1)
indicators = {
'ema_55': ema_55,
'sma_vol20': sma_vol20,
'macd': macd,
'signal_line': signal_line,
'rsi_val': rsi_val,
'atr14': atr14,
'range20': range20,
'sma_atr20': sma_atr20,
'sma_range20': sma_range20,
'rvol': rvol,
'delta': delta,
'vol_abs_thresh': vol_abs_thresh,
'sniper_thresh': sniper_thresh,
'trend': trend,
'pattern': pattern,
'volumes': volumes,
'closes': closes,
'highs': highs,
'lows': lows
}
return indicators, features
except Exception as e:
Log(f"Technical indicators calculation error: {str(e)}")
return None, None
# ========== Market State Detection Class ==========
class MarketStateDetector:
@staticmethod
def detect_market_type(indicators):
"""Detect market state"""
if indicators is None:
return "Unknown"
try:
# Get latest values
close = indicators['closes'][-1]
ema_55 = indicators['ema_55'][-1]
macd = indicators['macd'][-1]
signal_line = indicators['signal_line'][-1]
rsi_val = indicators['rsi_val'][-1]
atr14 = indicators['atr14'][-1]
volume = indicators['volumes'][-1]
sma_vol20 = indicators['sma_vol20'][-1]
sma_atr20 = indicators['sma_atr20'][-1]
range20 = indicators['range20'][-1]
sma_range20 = indicators['sma_range20'][-1]
rvol = indicators['rvol'][-1]
delta = indicators['delta'][-1]
# Check validity
if (np.isnan(ema_55) or np.isnan(macd) or np.isnan(signal_line) or
np.isnan(rsi_val) or np.isnan(atr14) or np.isnan(sma_atr20)):
return "Unknown"
# Market type judgment
is_bull = (close > ema_55 and macd > signal_line and rsi_val > 50 and rvol > 1)
is_bear = (close < ema_55 and macd < signal_line and rsi_val < 50 and volume > sma_vol20)
is_sideways = (abs(close - ema_55) < atr14 * 0.5 and atr14 < sma_atr20)
is_volatile = (atr14 > sma_atr20 * 1.2)
# Judgments requiring historical data
if len(indicators['closes']) >= 2:
price_change = indicators['closes'][-1] - indicators['closes'][-2]
is_momentum = (price_change > atr14 * 1.5 and volume > sma_vol20 * 1.5)
is_wolf = (price_change < -atr14 and close < ema_55)
else:
is_momentum = False
is_wolf = False
is_mean_rev = (rsi_val > 70 or rsi_val < 30)
is_box = (is_sideways and range20 < sma_range20 * 0.8)
is_macro = (abs(delta) > atr14 * 2) if not np.isnan(delta) else False
is_eagle = (is_bull and atr14 < sma_atr20 * 0.8)
# Priority judgment
if is_eagle:
return "Eagle"
elif is_bull:
return "Bull"
elif is_wolf:
return "Wolf"
elif is_bear:
return "Bear"
elif is_box:
return "Box"
elif is_sideways:
return "Sideways"
elif is_volatile:
return "Volatile"
elif is_momentum:
return "Momentum"
elif is_mean_rev:
return "MeanRev"
elif is_macro:
return "Macro"
else:
return "Unknown"
except Exception as e:
Log(f"Market state detection error: {str(e)}")
return "Unknown"
# ========== Dynamic Weight Generator ==========
class DynamicWeightGenerator:
@staticmethod
def generate_weights_from_predicted_return(predicted_return, market_type):
"""Generate dynamic weights based on predicted return rate and market state"""
# Base weight matrix (different market types)
base_weights_matrix = {
"Bull": [2.0, 1.5, 2.0, 1.3, 1.2, 1.0, 1.2, 1.0, 1.0, 1.0],
"Bear": [2.0, 1.5, 2.0, 1.5, 1.3, 1.1, 1.2, 1.1, 1.0, 1.0],
"Eagle": [2.2, 1.4, 2.1, 1.2, 1.3, 1.1, 1.1, 1.0, 1.0, 1.1],
"Wolf": [1.8, 1.6, 1.8, 1.6, 1.2, 1.0, 1.3, 1.2, 1.0, 0.9],
"Momentum": [1.5, 1.2, 1.8, 2.0, 2.0, 1.5, 1.5, 1.3, 1.2, 1.0],
"Sideways": [1.0, 1.4, 1.0, 0.8, 0.7, 1.0, 0.9, 0.8, 1.0, 1.3],
"Volatile": [1.2, 1.5, 1.3, 1.6, 1.8, 1.2, 1.4, 1.3, 1.4, 1.0],
}
base_weights = base_weights_matrix.get(market_type, [1.0] * 10)
# ๐ฅ Dynamically adjust weights based on predicted return rate
adjustment_factors = [1.0] * 10
# Strength of predicted return rate
return_strength = abs(predicted_return)
return_direction = 1 if predicted_return > 0 else -1
if return_strength > 0.02: # Strong prediction signal > 2%
if return_direction > 0: # Predict upward
adjustment_factors[0] *= 1.3 # Enhance trend weight
adjustment_factors[2] *= 1.2 # Enhance MACD weight
adjustment_factors[4] *= 1.15 # Enhance relative volume weight
adjustment_factors[1] *= 0.9 # Reduce RSI weight
else: # Predict downward
adjustment_factors[1] *= 1.3 # Enhance RSI weight
adjustment_factors[3] *= 1.2 # Enhance volume weight
adjustment_factors[0] *= 0.9 # Reduce trend weight
elif return_strength > 0.01: # Medium prediction signal 1%-2%
if return_direction > 0:
adjustment_factors[0] *= 1.15
adjustment_factors[2] *= 1.1
else:
adjustment_factors[1] *= 1.15
adjustment_factors[3] *= 1.1
# Volatility adjustment
if return_strength > 0.03: # High volatility expectation > 3%
adjustment_factors[4] *= 1.2 # Enhance relative volume weight
adjustment_factors[6] *= 1.15 # Enhance sniper weight
adjustment_factors[7] *= 1.1 # Enhance blocks weight
# Generate final dynamic weights
dynamic_weights = [base_weights[i] * adjustment_factors[i] for i in range(10)]
# Weight normalization (optional)
# total_weight = sum(dynamic_weights)
# dynamic_weights = [w / total_weight * 10 for w in dynamic_weights]
return dynamic_weights
# ========== Smart Scoring System ==========
class SmartScoringSystem:
def __init__(self):
self.return_predictor = ReturnPredictor()
self.weight_generator = DynamicWeightGenerator()
self.is_model_trained = False
def calculate_score(self, indicators, market_type, features=None):
"""Calculate trading score (using dynamic weights from predicted return rate)"""
if indicators is None:
return 50.0
try:
# ๐ฅ Core logic: Use current indicators to predict next period return rate
if self.is_model_trained and features is not None:
predicted_return = self.return_predictor.predict(features)[0, 0]
else:
predicted_return = 0.0
Log(f"๐ Using base weights for calculation")
# Generate dynamic weights based on predicted return rate
dynamic_weights = self.weight_generator.generate_weights_from_predicted_return(
predicted_return, market_type)
# Get latest indicator values
trend = indicators['trend'][-1]
rsi_val = indicators['rsi_val'][-1]
macd = indicators['macd'][-1]
signal_line = indicators['signal_line'][-1]
volume = indicators['volumes'][-1]
vol_abs_thresh = indicators['vol_abs_thresh'][-1]
sma_vol20 = indicators['sma_vol20'][-1]
rvol = indicators['rvol'][-1]
delta = indicators['delta'][-1]
sniper_thresh = indicators['sniper_thresh']
pattern = indicators['pattern'][-1]
# Calculate individual scores
base_score = 0.0
# 1. Trend score
trend_score = 20 if trend == 1 else (-20 if trend == -1 else 0)
base_score += trend_score * dynamic_weights[0]
# 2. RSI score
rsi_score = -10 if rsi_val > 70 else (10 if rsi_val < 30 else 0)
base_score += rsi_score * dynamic_weights[1]
# 3. MACD score
macd_score = 10 if macd > signal_line else -10
base_score += macd_score * dynamic_weights[2]
# 4. Volume score
vol_score = 8 if volume > vol_abs_thresh else (-8 if volume < sma_vol20 else 0)
base_score += vol_score * dynamic_weights[3]
# 5. Relative volume score
rvol_score = 7 if rvol > 1.5 else (-7 if rvol < 0.8 else 0)
base_score += rvol_score * dynamic_weights[4]
# 6. Delta score
delta_score = 6 if delta > 0 else -6
base_score += delta_score * dynamic_weights[5]
# 7. Sniper score
sniper_score = 8 if volume > sniper_thresh else (-8 if volume < sma_vol20 else 0)
base_score += sniper_score * dynamic_weights[6]
# 8. Blocks score
if len(indicators['volumes']) >= 10:
highest_vol = np.max(indicators['volumes'][-10:])
blocks_score = 5 if volume > highest_vol * 0.8 else (-5 if volume < sma_vol20 else 0)
else:
blocks_score = 0
base_score += blocks_score * dynamic_weights[7]
# 9. Tick score
tick_score = 5 if volume > sma_vol20 else -5
base_score += tick_score * dynamic_weights[8]
# 10. Pattern score
pattern_score = 7 if pattern == 1 else (5 if pattern == 2 else 0)
base_score += pattern_score * dynamic_weights[9]
# Convert to percentage score
score_pct = max(0, min(100, 50 + base_score))
return score_pct
except Exception as e:
Log(f"Score calculation error: {str(e)}")
return 50.0
def train_return_predictor(self, X, y):
"""Train return rate predictor"""
if len(X) < 20:
Log("Insufficient training data, skipping return predictor training")
return False
X_array = np.array(X)
y_array = np.array(y).reshape(-1, 1)
Log(f"๐ง Starting return predictor training, sample count: {len(X_array)}")
Log(f"๐ Return rate range: [{np.min(y_array)*100:.3f}%, {np.max(y_array)*100:.3f}%]")
self.return_predictor.train(X_array, y_array, epochs=100)
self.is_model_trained = True
# Validate model prediction performance
predictions = self.return_predictor.predict(X_array)
mse = np.mean((predictions - y_array) ** 2)
correlation = np.corrcoef(predictions.flatten(), y_array.flatten())[0, 1]
Log(f"โ
Return predictor training completed")
Log(f"๐ MSE: {mse:.6f}, Correlation coefficient: {correlation:.4f}")
return True
# ========== Dynamic Parameter Manager ==========
class DynamicParameterManager:
def __init__(self):
self.market_params = {
"Bull": {"stop_loss": 0.02, "take_profit": 0.05},
"Bear": {"stop_loss": 0.02, "take_profit": 0.05},
"Eagle": {"stop_loss": 0.015, "take_profit": 0.06},
"Wolf": {"stop_loss": 0.025, "take_profit": 0.04},
"Momentum": {"stop_loss": 0.025, "take_profit": 0.06},
"Sideways": {"stop_loss": 0.01, "take_profit": 0.02},
"Volatile": {"stop_loss": 0.03, "take_profit": 0.07},
"Unknown": {"stop_loss": 0.02, "take_profit": 0.03}
}
def get_params(self, market_type):
return self.market_params.get(market_type, self.market_params["Unknown"])
# ========== Main Strategy Class ==========
class PredictiveNeuralTradingStrategy:
def __init__(self):
self.data_buffer = deque(maxlen=200)
self.feature_buffer = deque(maxlen=100)
self.label_buffer = deque(maxlen=100) # Store return rate labels
self.scoring_system = SmartScoringSystem()
self.param_manager = DynamicParameterManager()
# Training control
self.last_retrain_time = 0
self.retrain_interval = 3600 * 6 # Retrain every 6 hours
self.min_train_samples = 30
# Trading state
self.POSITION_NONE = 0
self.POSITION_LONG = 1
self.POSITION_SHORT = 2
self.position_state = self.POSITION_NONE
# Trading records
self.open_price = 0
self.counter = {'win': 0, 'loss': 0}
# K-line data management
self.last_processed_time = 0
def get_current_position(self):
"""Get current futures position status"""
try:
positions = exchange.GetPosition()
if not positions:
return self.POSITION_NONE, 0
long_amount = 0
short_amount = 0
for pos in positions:
amount = pos.get('Amount', 0)
pos_type = pos.get('Type', -1)
if amount > 0:
if pos_type == 0: # Long position
long_amount += amount
elif pos_type == 1: # Short position
short_amount += amount
net_position = long_amount - short_amount
if net_position > 0:
return self.POSITION_LONG, net_position
elif net_position < 0:
return self.POSITION_SHORT, abs(net_position)
else:
return self.POSITION_NONE, 0
except Exception as e:
Log(f"Get position error: {str(e)}")
return self.POSITION_NONE, 0
def collect_data(self, records):
"""Collect data and generate training samples"""
if not records or len(records) < 55:
return False
# Check if there's a new completed K-line
if len(records) > 1:
latest_completed = records[-2]
current_time = latest_completed['Time']
# Skip if this K-line has already been processed
if current_time <= self.last_processed_time:
return False
self.last_processed_time = current_time
# Add completed K-lines to buffer
completed_records = records[:-1] if len(records) > 1 else []
if completed_records:
self.data_buffer.extend(completed_records[-5:])
# ๐ฅ Generate training samples: X[t] -> y[t+1]
if len(self.data_buffer) >= 2:
# Use the second-to-last record as features, last record to calculate return rate label
buffer_list = list(self.data_buffer)
# Calculate t-1 moment indicators as features
feature_records = buffer_list[:-1] if len(buffer_list) > 1 else buffer_list
indicators, features = TechnicalIndicators.calculate_indicators(
feature_records, use_completed_only=False)
if indicators is not None and features is not None:
# Calculate t moment return rate relative to t-1 moment as label
if len(buffer_list) >= 2:
current_close = buffer_list[-1]['Close']
previous_close = buffer_list[-2]['Close']
if previous_close > 0:
return_rate = (current_close - previous_close) / previous_close
# Add to training set
self.feature_buffer.append(features[0])
self.label_buffer.append(return_rate)
Log(f"๐ New sample: Return rate={return_rate*100:.3f}%, Feature dimensions={features.shape}")
return True
def should_retrain(self):
"""Determine if retraining is needed"""
import time
current_time = time.time()
return (current_time - self.last_retrain_time > self.retrain_interval and
len(self.feature_buffer) >= self.min_train_samples)
def train_model(self):
"""Train return rate predictor"""
if len(self.feature_buffer) < self.min_train_samples:
Log("Insufficient training data, skipping training")
return False
X = list(self.feature_buffer)
y = list(self.label_buffer)
success = self.scoring_system.train_return_predictor(X, y)
if success:
import time
self.last_retrain_time = time.time()
return success
def get_trading_signals(self, records):
"""Get trading signals"""
# Calculate current moment technical indicators
indicators, features = TechnicalIndicators.calculate_indicators(
list(self.data_buffer), use_completed_only=False)
if indicators is None:
return 50.0, "Unknown"
# Detect market type
market_type = MarketStateDetector.detect_market_type(indicators)
# ๐ฅ Calculate score using dynamic weights from predicted return rate
score = self.scoring_system.calculate_score(indicators, market_type, features)
return score, market_type
def check_entry_conditions(self, score, market_type):
"""Check entry conditions"""
# Long conditions
long_condition = ((market_type in ["Bull", "Eagle", "Momentum"]) and score > 65)
# Short conditions
short_condition = ((market_type in ["Bear", "Wolf"]) and score < 35)
return long_condition, short_condition
def open_long(self):
"""Open long position"""
try:
ticker = exchange.GetTicker()
if not ticker:
return False
buy_price = ticker['Last'] + 20
order_id = exchange.CreateOrder("", "buy", buy_price, AmountOP)
if order_id:
Sleep(2000)
order_info = exchange.GetOrder(order_id)
if order_info and order_info.get('Status') == 1:
self.open_price = order_info.get('AvgPrice', buy_price)
self.position_state = self.POSITION_LONG
Log(f"๐ Long position opened successfully: Price={self.open_price}, Amount={AmountOP}")
return True
else:
exchange.CancelOrder(order_id)
Log("Long position order not fully filled, cancelled")
return False
except Exception as e:
Log(f"Open long position error: {str(e)}")
return False
def open_short(self):
"""Open short position"""
try:
ticker = exchange.GetTicker()
if not ticker:
return False
sell_price = ticker['Last'] - 20
order_id = exchange.CreateOrder("", "sell", sell_price, AmountOP)
if order_id:
Sleep(2000)
order_info = exchange.GetOrder(order_id)
if order_info and order_info.get('Status') == 1:
self.open_price = order_info.get('AvgPrice', sell_price)
self.position_state = self.POSITION_SHORT
Log(f"๐ฏ Short position opened successfully: Price={self.open_price}, Amount={AmountOP}")
return True
else:
exchange.CancelOrder(order_id)
Log("Short position order not fully filled, cancelled")
return False
except Exception as e:
Log(f"Open short position error: {str(e)}")
return False
def close_position(self):
"""Close position"""
try:
positions = exchange.GetPosition()
if not positions:
Log("No position to close")
self.position_state = self.POSITION_NONE
self.open_price = 0
return True
ticker = exchange.GetTicker()
if not ticker:
return False
close_success = True
for pos in positions:
if pos['Amount'] == 0:
continue
amount = pos['Amount']
pos_type = pos['Type']
if pos_type == 0: # Close long position
close_price = ticker['Last'] - 20
order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
Log(f"๐ค Close long position: Price={close_price}, Amount={amount}")
elif pos_type == 1: # Close short position
close_price = ticker['Last'] + 20
order_id = exchange.CreateOrder("", "closesell", close_price, amount)
Log(f"๐ค Close short position: Price={close_price}, Amount={amount}")
if order_id:
Sleep(2000)
order_info = exchange.GetOrder(order_id)
if order_info and order_info.get('Status') == 1:
close_price = order_info.get('AvgPrice', close_price)
Log(f"โ
Position closed successfully: Fill price={close_price}")
self.update_profit_stats(close_price)
else:
exchange.CancelOrder(order_id)
close_success = False
Log(f"Close position order not fully filled, cancelled")
else:
close_success = False
Log("Failed to create close position order")
if close_success:
self.position_state = self.POSITION_NONE
self.open_price = 0
return close_success
except Exception as e:
Log(f"Close position error: {str(e)}")
return False
def update_profit_stats(self, close_price):
"""Update profit/loss statistics"""
if self.open_price == 0:
return
if self.position_state == self.POSITION_LONG:
if close_price > self.open_price:
self.counter['win'] += 1
Log("๐ฐ Long position profitable")
else:
self.counter['loss'] += 1
Log("๐ธ Long position loss")
elif self.position_state == self.POSITION_SHORT:
if close_price < self.open_price:
self.counter['win'] += 1
Log("๐ฐ Short position profitable")
else:
self.counter['loss'] += 1
Log("๐ธ Short position loss")
def check_stop_loss_take_profit(self, current_price, params):
"""Check stop loss and take profit, execute closing"""
if self.open_price == 0 or self.position_state == self.POSITION_NONE:
return False
stop_loss_pct = params["stop_loss"]
take_profit_pct = params["take_profit"]
if self.position_state == self.POSITION_LONG:
profit_pct = (current_price - self.open_price) / self.open_price
if profit_pct <= -stop_loss_pct:
Log(f"๐ด Long position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}")
return self.execute_close_position("Stop Loss")
elif profit_pct >= take_profit_pct:
Log(f"๐ข Long position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}")
return self.execute_close_position("Take Profit")
elif self.position_state == self.POSITION_SHORT:
profit_pct = (self.open_price - current_price) / self.open_price
if profit_pct <= -stop_loss_pct:
Log(f"๐ด Short position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}")
return self.execute_close_position("Stop Loss")
elif profit_pct >= take_profit_pct:
Log(f"๐ข Short position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}")
return self.execute_close_position("Take Profit")
return False
def execute_close_position(self, reason):
"""Execute close position operation (specifically for stop loss/take profit)"""
try:
positions = exchange.GetPosition()
if not positions:
Log(f"{reason} close position: No position")
self.position_state = self.POSITION_NONE
self.open_price = 0
return True
ticker = exchange.GetTicker()
if not ticker:
Log(f"{reason} close position failed: Cannot get ticker")
return False
Log(f"๐จ Executing {reason} close position operation...")
close_success = True
for pos in positions:
if pos['Amount'] == 0:
continue
amount = pos['Amount']
pos_type = pos['Type']
order_id = None
if pos_type == 0: # Close long position
close_price = ticker['Last'] - 50
order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
Log(f"๐ค {reason} close long position order: Price={close_price}, Amount={amount}")
elif pos_type == 1: # Close short position
close_price = ticker['Last'] + 50
order_id = exchange.CreateOrder("", "closesell", close_price, amount)
Log(f"๐ค {reason} close short position order: Price={close_price}, Amount={amount}")
if order_id:
Log(f"๐ {reason} close position order ID: {order_id}")
Sleep(1500)
for retry in range(2):
order_info = exchange.GetOrder(order_id)
if order_info:
status = order_info.get('Status', -1)
if status == 1:
close_price = order_info.get('AvgPrice', close_price)
Log(f"โ
{reason} close position successful: Fill price={close_price}")
self.update_profit_stats(close_price)
break
elif status == 0:
if retry == 0:
Log(f"โณ {reason} close position order executing, waiting...")
Sleep(1500)
else:
Log(f"โ ๏ธ {reason} close position order not fully filled, force cancel")
exchange.CancelOrder(order_id)
close_success = False
else:
Log(f"โ {reason} close position order status abnormal: {status}")
exchange.CancelOrder(order_id)
close_success = False
break
else:
Log(f"โ ๏ธ Cannot get {reason} close position order info, retry {retry+1}/2")
if retry == 1:
close_success = False
else:
Log(f"โ {reason} close position order creation failed")
close_success = False
if close_success:
Sleep(1000)
new_positions = exchange.GetPosition()
total_amount = sum(pos['Amount'] for pos in new_positions) if new_positions else 0
if total_amount == 0:
Log(f"โ
{reason} close position completed, position cleared")
self.position_state = self.POSITION_NONE
self.open_price = 0
return True
else:
Log(f"โ ๏ธ {reason} close position incomplete, remaining position: {total_amount}")
return False
else:
Log(f"โ {reason} close position failed")
return False
except Exception as e:
Log(f"โ {reason} close position error: {str(e)}")
return False
def execute_trade_logic(self, score, market_type, current_price):
"""Execute trading logic"""
params = self.param_manager.get_params(market_type)
# Get current actual position status
actual_position, position_amount = self.get_current_position()
# Synchronize internal state
self.position_state = actual_position
# First check stop loss/take profit (highest priority)
if self.position_state != self.POSITION_NONE:
if self.check_stop_loss_take_profit(current_price, params):
Log("๐จ Stop loss/take profit triggered, executed close position, skipping other trading signals")
return
# Get entry conditions
long_condition, short_condition = self.check_entry_conditions(score, market_type)
# Execute trading logic
if long_condition and self.position_state <= self.POSITION_NONE:
Log(f"๐ Long position signal: Market={market_type}, Prediction score={score:.1f} > 65")
self.open_long()
if short_condition and self.position_state >= self.POSITION_NONE:
Log(f"๐ Short position signal: Market={market_type}, Prediction score={score:.1f} < 35")
self.open_short()
if not long_condition and self.position_state > self.POSITION_NONE:
Log(f"๐ค Close long position signal: Market={market_type}, Prediction score={score:.1f}")
self.close_position()
if not short_condition and self.position_state < self.POSITION_NONE:
Log(f"๐ค Close short position signal: Market={market_type}, Prediction score={score:.1f}")
self.close_position()
def CancelPendingOrders():
"""Cancel all pending orders"""
while True:
orders = exchange.GetOrders()
if not orders:
break
for order in orders:
exchange.CancelOrder(order['Id'])
Sleep(500)
def main():
global AmountOP, LoopInterval
# Check initial position
initial_positions = exchange.GetPosition()
if initial_positions and any(pos['Amount'] > 0 for pos in initial_positions):
raise Error_AtBeginHasPosition()
# Cancel all pending orders
CancelPendingOrders()
# Initialize strategy
strategy = PredictiveNeuralTradingStrategy()
Log("๐ฎ Predictive Neural Network Futures Trading Strategy Started")
LogProfitReset()
# Data warm-up period
Log("Entering data warm-up period...")
warmup_count = 0
warmup_target = 60
while warmup_count < warmup_target:
records = exchange.GetRecords()
if records and len(records) >= 55:
if strategy.collect_data(records):
warmup_count += 1
if warmup_count % 10 == 0:
Log(f"Warm-up progress: {warmup_count}/{warmup_target}")
Sleep(5000)
Log("Data warm-up completed, starting initial return rate predictor training...")
strategy.train_model()
# Main trading loop
loop_count = 0
while True:
loop_count += 1
# Get K-line data
records = exchange.GetRecords()
if not records or len(records) < 55:
Sleep(LoopInterval * 1000)
continue
# Data processing
data_updated = strategy.collect_data(records)
# Check if retraining is needed
if strategy.should_retrain():
Log("๐ Retraining return rate predictor...")
strategy.train_model()
# Get trading signals
score, market_type = strategy.get_trading_signals(records)
# Get current real-time price
ticker = exchange.GetTicker()
if ticker:
current_price = ticker['Last']
else:
current_price = records[-1]['Close']
# Get current parameters
params = strategy.param_manager.get_params(market_type)
# Priority check for stop loss/take profit (using real-time price)
if strategy.position_state != strategy.POSITION_NONE:
if strategy.check_stop_loss_take_profit(current_price, params):
Log("โก Stop loss/take profit triggered, executed close position")
Sleep(LoopInterval * 1000)
continue
# Execute trading logic (only when there's new data)
if data_updated:
strategy.execute_trade_logic(score, market_type, current_price)
# Status display
pos_state_name = {
strategy.POSITION_NONE: "No Position",
strategy.POSITION_LONG: "Long",
strategy.POSITION_SHORT: "Short"
}.get(strategy.position_state, "Unknown")
data_status = "๐New Data" if data_updated else "โธ๏ธWaiting"
model_status = "๐ฎPrediction" if strategy.scoring_system.is_model_trained else "๐Basic"
# Get entry conditions for display
long_cond, short_cond = strategy.check_entry_conditions(score, market_type)
signal_status = ""
if long_cond:
signal_status = "๐Long"
elif short_cond:
signal_status = "๐Short"
else:
signal_status = "๐Hold"
# Display training sample count
sample_count = len(strategy.feature_buffer)
LogStatus(f"Loop: {loop_count}, Price: {current_price:.2f}, "
f"Prediction Score: {score:.1f}, Market: {market_type}, "
f"Position: {pos_state_name}, Signal: {signal_status}, "
f"Status: {data_status}, Mode: {model_status}, "
f"Samples: {sample_count}, "
f"Win: {strategy.counter['win']}, Loss: {strategy.counter['loss']}")
Sleep(LoopInterval * 1000)
# ========== Parameter Settings ==========
AmountOP = 1 # Futures contract quantity
LoopInterval = 3 # Loop interval (seconds)
if __name__ == "__main__":
main()

Top comments (0)