DEV Community

Cover image for From Fixed Weights to Neural Networks Machine Learning for a Pine Strategy
fmzquant
fmzquant

Posted on

From Fixed Weights to Neural Networks Machine Learning for a Pine Strategy

Accidentally Discovered an Interesting Pine Strategy
A few days ago, while browsing strategies on the Inventor forum, I came across one called "Panel Pro+ Quantum SmartPrompt". After reviewing the code, I found this strategy's approach quite interesting: it uses 10 technical indicators, assigns different weights to each indicator based on market conditions, and finally calculates a score to determine buy/sell decisions. For example, in a bull market state, trend indicators have a weight of 2.0 and RSI has a weight of 1.5; in a bear market state, the weights are different. It feels like it's mimicking human thinking: focusing on different aspects under different circumstances.
Upon closer examination, this structure is quite similar to a neural network:

  • 10 technical indicators as inputs
  • Market state classification acts like a hidden layer
  • Weight matrices serve as connection weights
  • Finally outputs a score However, the problem is that all weights are hardcoded, for example:
pine
if marketType == "Bull"
    array.set(weights, 0, 2.0) // Trend weight is fixed at 2.0
    array.set(weights, 1, 1.5) // RSI weight is fixed at 1.5
Enter fullscreen mode Exit fullscreen mode

These numbers are completely fixed by the author based on market experience, without any learning or optimization.

Idea: Make the Weights Learnable

Since the structure already resembles a neural network, why not make it truly capable of learning?
My idea is simple:

  • 1.Keep the original weight calculation method to get a "weight score"
  • 2.Use this weight score as input to train a small neural network
  • 3.Let the network learn to predict future returns from the weight score
  • 4.Decide whether to open positions based on the predicted return magnitude This way, we preserve the original strategy's logic while adding learning capability.

Implementation on the Inventor Platform

I chose the Inventor platform mainly because it supports Python and contains rich data.

Step 1: Rewrite Technical Indicators
I rewrote all indicators from the Pine script in Python, using the talib library to ensure calculation accuracy. This includes common indicators like EMA, MACD, RSI, ATR, as well as volume analysis and simple candlestick pattern recognition.

Step 2: Market State Detection
Following the original strategy's logic, I determine market types (Bull, Bear, Eagle, Wolf, etc.) based on combinations of various indicators. This part is basically a combination of if-else logic.

Step 3: Weight Score Calculation
This is the core part. I set up two sets of weights:

  • Base weights: [2.0, 1.5, 2.0, 1.3, 1.2, ...]
  • Market weights: adjusted according to different market states Final weight = Base weight ร— Market weight

Then I use these weights to calculate a weighted sum of the 10 indicators' raw scores to get the "weight score."

Step 4: Neural Network Predictor
I built a simple network:

  • Input: 1 feature (weight score)
  • Hidden layer: 16 neurons with ReLU activation
  • Output: predicted return, constrained to ยฑ5% using tanh Training objective: Use the weight score at time t-1 to predict the price change at time t.

Step 5: Trading Logic
Instead of buying/selling directly based on score levels, I now look at predicted returns:

  • Predicted return > 1.5%: go long or close short and go long
  • Predicted return < -1.5%: go short or close long and go short
  • Other cases: maintain current position I also keep stop-loss and take-profit mechanisms to ensure controllable risk.

Observations from Actual Operation

Data Collection
The strategy can normally collect training data. Each time there's a new candlestick, it uses the previous candlestick's weight score as a feature and the current candlestick's percentage change relative to the previous one as the label.

The data looks roughly like this:

Weight Score = 15.6, Return = +0.8%
Weight Score = -8.2, Return = -1.2%
Weight Score = 22.1, Return = +0.3%
Enter fullscreen mode Exit fullscreen mode

Model Training

The neural network trains normally, with MSE loss gradually decreasing. I set it to retrain every 4 hours to ensure the model can adapt to market changes.

Prediction Performance

The model's predictions do show some correlation with actual returns, but it's not particularly strong. Main issues:

  • 1.Single feature is too simple, possibly insufficient information
  • 2.Short-term price movements have high randomness
  • 3.Futures markets have considerable noise Trading Performance Thanks to stop-loss and take-profit protection, risk control for individual trades is decent. However, overall profitability is mediocre, mainly due to insufficient prediction accuracy.


Problems Encountered

  • Features Too Limited: Using only the weight score as a single feature is indeed too simple. Markets are so complex that one number can hardly capture everything.
  • Unstable Sample Quality: Contract prices have large short-term fluctuations, and often the ups and downs are actually random, making the quality of training samples unstable.
  • Overfitting Risk: Although the network is simple, there's still potential for overfitting when sample size is limited.
  • Real-time Requirements: Online learning needs to balance training time with real-time performance.
    Limited Time, Insufficient Optimization
    This strategy has many areas for improvement, but with limited time and energy, I couldn't optimize it deeply:

  • Feature Aspects: Could add more technical indicators or use statistical features of price sequences.

  • Model Aspects: Could try sequence models like LSTM, or use ensemble methods with multiple models.

  • Data Aspects: Improve sample quality and add data cleaning.

  • Risk Management: Refine dynamic stop-loss and optimize position management.
    Gains and Reflections
    This exploration taught me an important lesson: good inspiration is all about timely implementation! When I saw the weight matrix design in the Pine script, I immediately thought of the possibility of improving it with neural networks. If I had just thought about it without taking action, or procrastinated, this idea would likely have been forgotten.
    Fortunately, the Inventor platform provided a Python environment and data interfaces, allowing me to quickly turn the idea into runnable code. From generating the idea to completing the basic implementation took only about a day.
    Although the final strategy's performance was mediocre, running it in practice at least validated that this approach was feasible. More importantly, new ideas and improvement directions emerged during the implementation process. Without taking action promptly, none of these subsequent discoveries and thoughts would have occurred.
    Theoretical discussions can never compare to actually writing code, running data, and examining results. Quantitative trading is like this - there are many ideas, but truly valuable ones are those that get quickly implemented and validated.

python
'''backtest
start: 2025-07-31 00:00:00
end: 2025-08-07 00:00:00
period: 1h
basePeriod: 5m
exchanges: [{"eid":"Futures_Binance","currency":"ETH_USDT","balance":5000000,"fee":[0.01,0.01]}]
'''

import numpy as np
from collections import deque
import talib as TA

# ========== Exception Classes ==========
class Error_noSupport(BaseException):
    def __init__(self):
        Log("Only futures trading is supported! #FF0000")

class Error_AtBeginHasPosition(BaseException):
    def __init__(self):
        Log("Futures position exists at startup! #FF0000")

# ========== Return Prediction Neural Network ==========
class ReturnPredictor:
    def __init__(self, input_size=10, hidden_size=20, output_size=1):
        """Return prediction network: X[t] -> y[t+1] (return rate)"""
        self.W1 = np.random.randn(input_size, hidden_size) * 0.1
        self.b1 = np.zeros((1, hidden_size))
        self.W2 = np.random.randn(hidden_size, output_size) * 0.1
        self.b2 = np.zeros((1, output_size))
        self.learning_rate = 0.001

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-np.clip(x, -250, 250)))

    def tanh(self, x):
        return np.tanh(x)

    def forward(self, X):
        self.z1 = np.dot(X, self.W1) + self.b1
        self.a1 = self.sigmoid(self.z1)
        self.z2 = np.dot(self.a1, self.W2) + self.b2
        # Output predicted return rate, using tanh to limit to reasonable range
        self.a2 = self.tanh(self.z2) * 0.1  # Limit to ยฑ10% range
        return self.a2

    def backward(self, X, y, output):
        m = X.shape[0]

        # MSE loss gradient
        dZ2 = (output - y) / m
        # tanh derivative
        tanh_derivative = 1 - (output / 0.1) ** 2
        dZ2 = dZ2 * 0.1 * tanh_derivative

        dW2 = np.dot(self.a1.T, dZ2)
        db2 = np.sum(dZ2, axis=0, keepdims=True)

        dA1 = np.dot(dZ2, self.W2.T)
        dZ1 = dA1 * self.a1 * (1 - self.a1)  # sigmoid derivative
        dW1 = np.dot(X.T, dZ1)
        db1 = np.sum(dZ1, axis=0, keepdims=True)

        # Update weights
        self.W2 -= self.learning_rate * dW2
        self.b2 -= self.learning_rate * db2
        self.W1 -= self.learning_rate * dW1
        self.b1 -= self.learning_rate * db1

    def train(self, X, y, epochs=100):
        for i in range(epochs):
            output = self.forward(X)
            self.backward(X, y, output)
            if i % 20 == 0:
                loss = np.mean((output - y) ** 2)
                Log(f"Return prediction training epoch {i}, MSE loss: {loss:.6f}")

    def predict(self, X):
        return self.forward(X)

# ========== Technical Indicators Calculation Class ==========
class TechnicalIndicators:
    @staticmethod
    def calculate_indicators(records, use_completed_only=True):
        """Calculate technical indicators and features"""
        if len(records) < 55:
            return None, None

        # Only use completed candlestick data
        if use_completed_only and len(records) > 1:
            working_records = records[:-1]
        else:
            working_records = records

        if len(working_records) < 55:
            return None, None

        closes = np.array([r['Close'] for r in working_records])
        highs = np.array([r['High'] for r in working_records])
        lows = np.array([r['Low'] for r in working_records])
        volumes = np.array([r['Volume'] for r in working_records])
        opens = np.array([r['Open'] for r in working_records])

        try:
            # Basic indicators
            ema_55 = TA.EMA(closes, timeperiod=55)
            sma_vol20 = TA.SMA(volumes, timeperiod=20)
            macd, signal_line, _ = TA.MACD(closes)
            rsi_val = TA.RSI(closes, timeperiod=14)
            atr14 = TA.ATR(highs, lows, closes, timeperiod=14)
            range20 = TA.STDDEV(closes, timeperiod=20)

            # Calculate derived indicators
            sma_atr20 = TA.SMA(atr14, timeperiod=20)
            sma_range20 = TA.SMA(range20, timeperiod=20)
            rvol = volumes / sma_vol20 if sma_vol20[-1] > 0 else np.ones_like(volumes)
            delta = closes - opens

            # Calculate volume thresholds
            vol_abs_thresh = sma_vol20 * 1.2
            sniper_thresh = np.percentile(volumes[-40:], 80) if len(volumes) >= 40 else sma_vol20[-1]

            # Trend
            trend = np.where(closes > ema_55, 1, np.where(closes < ema_55, -1, 0))

            # Simplified candlestick patterns
            body_size = np.abs(closes - opens)
            total_range = highs - lows

            # Hammer pattern
            is_hammer = ((total_range > 3 * body_size) & 
                        ((closes - lows) / (total_range + 0.001) > 0.6) & 
                        ((opens - lows) / (total_range + 0.001) > 0.6))

            # Engulfing pattern
            is_engulfing = np.zeros_like(closes, dtype=bool)
            if len(closes) >= 2:
                is_engulfing[1:] = ((closes[1:] > opens[:-1]) & 
                                   (opens[1:] < closes[:-1]) & 
                                   (closes[1:] > opens[1:]) & 
                                   (opens[1:] < closes[1:]))

            pattern = np.where(is_hammer, 1, np.where(is_engulfing, 2, 0))

            # ๐Ÿ”ฅ Calculate normalized feature vectors (for neural network input)
            features = []

            # 1. Trend feature
            if len(ema_55) > 0 and not np.isnan(ema_55[-1]):
                trend_feature = (closes[-1] - ema_55[-1]) / ema_55[-1]
                features.append(np.tanh(trend_feature * 100))
            else:
                features.append(0)

            # 2. RSI feature
            if len(rsi_val) > 0 and not np.isnan(rsi_val[-1]):
                rsi_feature = (rsi_val[-1] - 50) / 50
                features.append(rsi_feature)
            else:
                features.append(0)

            # 3. MACD feature
            if len(macd) > 0 and not np.isnan(macd[-1]) and not np.isnan(signal_line[-1]):
                macd_feature = (macd[-1] - signal_line[-1]) / closes[-1] if closes[-1] > 0 else 0
                features.append(np.tanh(macd_feature * 1000))
            else:
                features.append(0)

            # 4. Volume feature
            if len(vol_abs_thresh) > 0 and vol_abs_thresh[-1] > 0:
                vol_feature = volumes[-1] / vol_abs_thresh[-1] - 1
                features.append(np.tanh(vol_feature))
            else:
                features.append(0)

            # 5. Relative volume feature
            if len(rvol) > 0 and not np.isnan(rvol[-1]):
                rvol_feature = rvol[-1] - 1
                features.append(np.tanh(rvol_feature))
            else:
                features.append(0)

            # 6. Delta feature
            if len(delta) > 0 and not np.isnan(delta[-1]) and closes[-1] > 0:
                delta_feature = delta[-1] / closes[-1]
                features.append(np.tanh(delta_feature * 100))
            else:
                features.append(0)

            # 7. ATR feature
            if len(atr14) > 0 and len(sma_atr20) > 0 and sma_atr20[-1] > 0:
                atr_feature = atr14[-1] / sma_atr20[-1] - 1
                features.append(np.tanh(atr_feature))
            else:
                features.append(0)

            # 8. Blocks feature
            if len(volumes) >= 10:
                highest_vol = np.max(volumes[-10:])
                blocks_feature = volumes[-1] / highest_vol - 0.8 if highest_vol > 0 else 0
                features.append(np.tanh(blocks_feature * 5))
            else:
                features.append(0)

            # 9. Tick feature
            if len(sma_vol20) > 0 and sma_vol20[-1] > 0:
                tick_feature = volumes[-1] / sma_vol20[-1] - 1
                features.append(np.tanh(tick_feature))
            else:
                features.append(0)

            # 10. Pattern feature
            pattern_feature = pattern[-1] / 2.0 if len(pattern) > 0 else 0
            features.append(pattern_feature)

            # Ensure correct feature count
            while len(features) < 10:
                features.append(0)

            features = np.array(features[:10]).reshape(1, -1)

            indicators = {
                'ema_55': ema_55,
                'sma_vol20': sma_vol20,
                'macd': macd,
                'signal_line': signal_line,
                'rsi_val': rsi_val,
                'atr14': atr14,
                'range20': range20,
                'sma_atr20': sma_atr20,
                'sma_range20': sma_range20,
                'rvol': rvol,
                'delta': delta,
                'vol_abs_thresh': vol_abs_thresh,
                'sniper_thresh': sniper_thresh,
                'trend': trend,
                'pattern': pattern,
                'volumes': volumes,
                'closes': closes,
                'highs': highs,
                'lows': lows
            }

            return indicators, features

        except Exception as e:
            Log(f"Technical indicators calculation error: {str(e)}")
            return None, None

# ========== Market State Detection Class ==========
class MarketStateDetector:
    @staticmethod
    def detect_market_type(indicators):
        """Detect market state"""
        if indicators is None:
            return "Unknown"

        try:
            # Get latest values
            close = indicators['closes'][-1]
            ema_55 = indicators['ema_55'][-1]
            macd = indicators['macd'][-1]
            signal_line = indicators['signal_line'][-1]
            rsi_val = indicators['rsi_val'][-1]
            atr14 = indicators['atr14'][-1]
            volume = indicators['volumes'][-1]
            sma_vol20 = indicators['sma_vol20'][-1]
            sma_atr20 = indicators['sma_atr20'][-1]
            range20 = indicators['range20'][-1]
            sma_range20 = indicators['sma_range20'][-1]
            rvol = indicators['rvol'][-1]
            delta = indicators['delta'][-1]

            # Check validity
            if (np.isnan(ema_55) or np.isnan(macd) or np.isnan(signal_line) or 
                np.isnan(rsi_val) or np.isnan(atr14) or np.isnan(sma_atr20)):
                return "Unknown"

            # Market type judgment
            is_bull = (close > ema_55 and macd > signal_line and rsi_val > 50 and rvol > 1)
            is_bear = (close < ema_55 and macd < signal_line and rsi_val < 50 and volume > sma_vol20)
            is_sideways = (abs(close - ema_55) < atr14 * 0.5 and atr14 < sma_atr20)
            is_volatile = (atr14 > sma_atr20 * 1.2)

            # Judgments requiring historical data
            if len(indicators['closes']) >= 2:
                price_change = indicators['closes'][-1] - indicators['closes'][-2]
                is_momentum = (price_change > atr14 * 1.5 and volume > sma_vol20 * 1.5)
                is_wolf = (price_change < -atr14 and close < ema_55)
            else:
                is_momentum = False
                is_wolf = False

            is_mean_rev = (rsi_val > 70 or rsi_val < 30)
            is_box = (is_sideways and range20 < sma_range20 * 0.8)
            is_macro = (abs(delta) > atr14 * 2) if not np.isnan(delta) else False
            is_eagle = (is_bull and atr14 < sma_atr20 * 0.8)

            # Priority judgment
            if is_eagle:
                return "Eagle"
            elif is_bull:
                return "Bull"
            elif is_wolf:
                return "Wolf"
            elif is_bear:
                return "Bear"
            elif is_box:
                return "Box"
            elif is_sideways:
                return "Sideways"
            elif is_volatile:
                return "Volatile"
            elif is_momentum:
                return "Momentum"
            elif is_mean_rev:
                return "MeanRev"
            elif is_macro:
                return "Macro"
            else:
                return "Unknown"

        except Exception as e:
            Log(f"Market state detection error: {str(e)}")
            return "Unknown"

# ========== Dynamic Weight Generator ==========
class DynamicWeightGenerator:
    @staticmethod
    def generate_weights_from_predicted_return(predicted_return, market_type):
        """Generate dynamic weights based on predicted return rate and market state"""

        # Base weight matrix (different market types)
        base_weights_matrix = {
            "Bull": [2.0, 1.5, 2.0, 1.3, 1.2, 1.0, 1.2, 1.0, 1.0, 1.0],
            "Bear": [2.0, 1.5, 2.0, 1.5, 1.3, 1.1, 1.2, 1.1, 1.0, 1.0],
            "Eagle": [2.2, 1.4, 2.1, 1.2, 1.3, 1.1, 1.1, 1.0, 1.0, 1.1],
            "Wolf": [1.8, 1.6, 1.8, 1.6, 1.2, 1.0, 1.3, 1.2, 1.0, 0.9],
            "Momentum": [1.5, 1.2, 1.8, 2.0, 2.0, 1.5, 1.5, 1.3, 1.2, 1.0],
            "Sideways": [1.0, 1.4, 1.0, 0.8, 0.7, 1.0, 0.9, 0.8, 1.0, 1.3],
            "Volatile": [1.2, 1.5, 1.3, 1.6, 1.8, 1.2, 1.4, 1.3, 1.4, 1.0],
        }

        base_weights = base_weights_matrix.get(market_type, [1.0] * 10)

        # ๐Ÿ”ฅ Dynamically adjust weights based on predicted return rate
        adjustment_factors = [1.0] * 10

        # Strength of predicted return rate
        return_strength = abs(predicted_return)
        return_direction = 1 if predicted_return > 0 else -1

        if return_strength > 0.02:  # Strong prediction signal > 2%
            if return_direction > 0:  # Predict upward
                adjustment_factors[0] *= 1.3  # Enhance trend weight
                adjustment_factors[2] *= 1.2  # Enhance MACD weight
                adjustment_factors[4] *= 1.15 # Enhance relative volume weight
                adjustment_factors[1] *= 0.9  # Reduce RSI weight
            else:  # Predict downward
                adjustment_factors[1] *= 1.3  # Enhance RSI weight
                adjustment_factors[3] *= 1.2  # Enhance volume weight
                adjustment_factors[0] *= 0.9  # Reduce trend weight

        elif return_strength > 0.01:  # Medium prediction signal 1%-2%
            if return_direction > 0:
                adjustment_factors[0] *= 1.15
                adjustment_factors[2] *= 1.1
            else:
                adjustment_factors[1] *= 1.15
                adjustment_factors[3] *= 1.1

        # Volatility adjustment
        if return_strength > 0.03:  # High volatility expectation > 3%
            adjustment_factors[4] *= 1.2  # Enhance relative volume weight
            adjustment_factors[6] *= 1.15 # Enhance sniper weight
            adjustment_factors[7] *= 1.1  # Enhance blocks weight

        # Generate final dynamic weights
        dynamic_weights = [base_weights[i] * adjustment_factors[i] for i in range(10)]

        # Weight normalization (optional)
        # total_weight = sum(dynamic_weights)
        # dynamic_weights = [w / total_weight * 10 for w in dynamic_weights]

        return dynamic_weights

# ========== Smart Scoring System ==========
class SmartScoringSystem:
    def __init__(self):
        self.return_predictor = ReturnPredictor()
        self.weight_generator = DynamicWeightGenerator()
        self.is_model_trained = False

    def calculate_score(self, indicators, market_type, features=None):
        """Calculate trading score (using dynamic weights from predicted return rate)"""
        if indicators is None:
            return 50.0

        try:
            # ๐Ÿ”ฅ Core logic: Use current indicators to predict next period return rate
            if self.is_model_trained and features is not None:
                predicted_return = self.return_predictor.predict(features)[0, 0]

            else:
                predicted_return = 0.0
                Log(f"๐Ÿ“Š Using base weights for calculation")

            # Generate dynamic weights based on predicted return rate
            dynamic_weights = self.weight_generator.generate_weights_from_predicted_return(
                predicted_return, market_type)

            # Get latest indicator values
            trend = indicators['trend'][-1]
            rsi_val = indicators['rsi_val'][-1]
            macd = indicators['macd'][-1]
            signal_line = indicators['signal_line'][-1]
            volume = indicators['volumes'][-1]
            vol_abs_thresh = indicators['vol_abs_thresh'][-1]
            sma_vol20 = indicators['sma_vol20'][-1]
            rvol = indicators['rvol'][-1]
            delta = indicators['delta'][-1]
            sniper_thresh = indicators['sniper_thresh']
            pattern = indicators['pattern'][-1]

            # Calculate individual scores
            base_score = 0.0

            # 1. Trend score
            trend_score = 20 if trend == 1 else (-20 if trend == -1 else 0)
            base_score += trend_score * dynamic_weights[0]

            # 2. RSI score
            rsi_score = -10 if rsi_val > 70 else (10 if rsi_val < 30 else 0)
            base_score += rsi_score * dynamic_weights[1]

            # 3. MACD score
            macd_score = 10 if macd > signal_line else -10
            base_score += macd_score * dynamic_weights[2]

            # 4. Volume score
            vol_score = 8 if volume > vol_abs_thresh else (-8 if volume < sma_vol20 else 0)
            base_score += vol_score * dynamic_weights[3]

            # 5. Relative volume score
            rvol_score = 7 if rvol > 1.5 else (-7 if rvol < 0.8 else 0)
            base_score += rvol_score * dynamic_weights[4]

            # 6. Delta score
            delta_score = 6 if delta > 0 else -6
            base_score += delta_score * dynamic_weights[5]

            # 7. Sniper score
            sniper_score = 8 if volume > sniper_thresh else (-8 if volume < sma_vol20 else 0)
            base_score += sniper_score * dynamic_weights[6]

            # 8. Blocks score
            if len(indicators['volumes']) >= 10:
                highest_vol = np.max(indicators['volumes'][-10:])
                blocks_score = 5 if volume > highest_vol * 0.8 else (-5 if volume < sma_vol20 else 0)
            else:
                blocks_score = 0
            base_score += blocks_score * dynamic_weights[7]

            # 9. Tick score
            tick_score = 5 if volume > sma_vol20 else -5
            base_score += tick_score * dynamic_weights[8]

            # 10. Pattern score
            pattern_score = 7 if pattern == 1 else (5 if pattern == 2 else 0)
            base_score += pattern_score * dynamic_weights[9]

            # Convert to percentage score
            score_pct = max(0, min(100, 50 + base_score))

            return score_pct

        except Exception as e:
            Log(f"Score calculation error: {str(e)}")
            return 50.0

    def train_return_predictor(self, X, y):
        """Train return rate predictor"""
        if len(X) < 20:
            Log("Insufficient training data, skipping return predictor training")
            return False

        X_array = np.array(X)
        y_array = np.array(y).reshape(-1, 1)

        Log(f"๐Ÿง  Starting return predictor training, sample count: {len(X_array)}")
        Log(f"๐Ÿ“Š Return rate range: [{np.min(y_array)*100:.3f}%, {np.max(y_array)*100:.3f}%]")

        self.return_predictor.train(X_array, y_array, epochs=100)
        self.is_model_trained = True

        # Validate model prediction performance
        predictions = self.return_predictor.predict(X_array)
        mse = np.mean((predictions - y_array) ** 2)
        correlation = np.corrcoef(predictions.flatten(), y_array.flatten())[0, 1]

        Log(f"โœ… Return predictor training completed")
        Log(f"๐Ÿ“ˆ MSE: {mse:.6f}, Correlation coefficient: {correlation:.4f}")

        return True

# ========== Dynamic Parameter Manager ==========
class DynamicParameterManager:
    def __init__(self):
        self.market_params = {
            "Bull": {"stop_loss": 0.02, "take_profit": 0.05},
            "Bear": {"stop_loss": 0.02, "take_profit": 0.05},
            "Eagle": {"stop_loss": 0.015, "take_profit": 0.06},
            "Wolf": {"stop_loss": 0.025, "take_profit": 0.04},
            "Momentum": {"stop_loss": 0.025, "take_profit": 0.06},
            "Sideways": {"stop_loss": 0.01, "take_profit": 0.02},
            "Volatile": {"stop_loss": 0.03, "take_profit": 0.07},
            "Unknown": {"stop_loss": 0.02, "take_profit": 0.03}
        }

    def get_params(self, market_type):
        return self.market_params.get(market_type, self.market_params["Unknown"])

# ========== Main Strategy Class ==========
class PredictiveNeuralTradingStrategy:
    def __init__(self):
        self.data_buffer = deque(maxlen=200)
        self.feature_buffer = deque(maxlen=100)
        self.label_buffer = deque(maxlen=100)  # Store return rate labels
        self.scoring_system = SmartScoringSystem()
        self.param_manager = DynamicParameterManager()

        # Training control
        self.last_retrain_time = 0
        self.retrain_interval = 3600 * 6  # Retrain every 6 hours
        self.min_train_samples = 30

        # Trading state
        self.POSITION_NONE = 0
        self.POSITION_LONG = 1
        self.POSITION_SHORT = 2
        self.position_state = self.POSITION_NONE

        # Trading records
        self.open_price = 0
        self.counter = {'win': 0, 'loss': 0}

        # K-line data management
        self.last_processed_time = 0

    def get_current_position(self):
        """Get current futures position status"""
        try:
            positions = exchange.GetPosition()
            if not positions:
                return self.POSITION_NONE, 0

            long_amount = 0
            short_amount = 0

            for pos in positions:
                amount = pos.get('Amount', 0)
                pos_type = pos.get('Type', -1)

                if amount > 0:
                    if pos_type == 0:  # Long position
                        long_amount += amount
                    elif pos_type == 1:  # Short position
                        short_amount += amount

            net_position = long_amount - short_amount

            if net_position > 0:
                return self.POSITION_LONG, net_position
            elif net_position < 0:
                return self.POSITION_SHORT, abs(net_position)
            else:
                return self.POSITION_NONE, 0

        except Exception as e:
            Log(f"Get position error: {str(e)}")
            return self.POSITION_NONE, 0

    def collect_data(self, records):
        """Collect data and generate training samples"""
        if not records or len(records) < 55:
            return False

        # Check if there's a new completed K-line
        if len(records) > 1:
            latest_completed = records[-2]
            current_time = latest_completed['Time']

            # Skip if this K-line has already been processed
            if current_time <= self.last_processed_time:
                return False

            self.last_processed_time = current_time

        # Add completed K-lines to buffer
        completed_records = records[:-1] if len(records) > 1 else []
        if completed_records:
            self.data_buffer.extend(completed_records[-5:])

        # ๐Ÿ”ฅ Generate training samples: X[t] -> y[t+1]
        if len(self.data_buffer) >= 2:
            # Use the second-to-last record as features, last record to calculate return rate label
            buffer_list = list(self.data_buffer)

            # Calculate t-1 moment indicators as features
            feature_records = buffer_list[:-1] if len(buffer_list) > 1 else buffer_list
            indicators, features = TechnicalIndicators.calculate_indicators(
                feature_records, use_completed_only=False)

            if indicators is not None and features is not None:
                # Calculate t moment return rate relative to t-1 moment as label
                if len(buffer_list) >= 2:
                    current_close = buffer_list[-1]['Close']
                    previous_close = buffer_list[-2]['Close']

                    if previous_close > 0:
                        return_rate = (current_close - previous_close) / previous_close

                        # Add to training set
                        self.feature_buffer.append(features[0])
                        self.label_buffer.append(return_rate)

                        Log(f"๐Ÿ“ˆ New sample: Return rate={return_rate*100:.3f}%, Feature dimensions={features.shape}")

        return True

    def should_retrain(self):
        """Determine if retraining is needed"""
        import time
        current_time = time.time()
        return (current_time - self.last_retrain_time > self.retrain_interval and 
                len(self.feature_buffer) >= self.min_train_samples)

    def train_model(self):
        """Train return rate predictor"""
        if len(self.feature_buffer) < self.min_train_samples:
            Log("Insufficient training data, skipping training")
            return False

        X = list(self.feature_buffer)
        y = list(self.label_buffer)

        success = self.scoring_system.train_return_predictor(X, y)

        if success:
            import time
            self.last_retrain_time = time.time()

        return success

    def get_trading_signals(self, records):
        """Get trading signals"""
        # Calculate current moment technical indicators
        indicators, features = TechnicalIndicators.calculate_indicators(
            list(self.data_buffer), use_completed_only=False)
        if indicators is None:
            return 50.0, "Unknown"

        # Detect market type
        market_type = MarketStateDetector.detect_market_type(indicators)

        # ๐Ÿ”ฅ Calculate score using dynamic weights from predicted return rate
        score = self.scoring_system.calculate_score(indicators, market_type, features)

        return score, market_type

    def check_entry_conditions(self, score, market_type):
        """Check entry conditions"""
        # Long conditions
        long_condition = ((market_type in ["Bull", "Eagle", "Momentum"]) and score > 65)

        # Short conditions  
        short_condition = ((market_type in ["Bear", "Wolf"]) and score < 35)

        return long_condition, short_condition

    def open_long(self):
        """Open long position"""
        try:
            ticker = exchange.GetTicker()
            if not ticker:
                return False

            buy_price = ticker['Last'] + 20
            order_id = exchange.CreateOrder("", "buy", buy_price, AmountOP)

            if order_id:
                Sleep(2000)
                order_info = exchange.GetOrder(order_id)
                if order_info and order_info.get('Status') == 1:
                    self.open_price = order_info.get('AvgPrice', buy_price)
                    self.position_state = self.POSITION_LONG
                    Log(f"๐Ÿš€ Long position opened successfully: Price={self.open_price}, Amount={AmountOP}")
                    return True
                else:
                    exchange.CancelOrder(order_id)
                    Log("Long position order not fully filled, cancelled")

            return False

        except Exception as e:
            Log(f"Open long position error: {str(e)}")
            return False

    def open_short(self):
        """Open short position"""
        try:
            ticker = exchange.GetTicker()
            if not ticker:
                return False

            sell_price = ticker['Last'] - 20
            order_id = exchange.CreateOrder("", "sell", sell_price, AmountOP)

            if order_id:
                Sleep(2000)
                order_info = exchange.GetOrder(order_id)
                if order_info and order_info.get('Status') == 1:
                    self.open_price = order_info.get('AvgPrice', sell_price)
                    self.position_state = self.POSITION_SHORT
                    Log(f"๐ŸŽฏ Short position opened successfully: Price={self.open_price}, Amount={AmountOP}")
                    return True
                else:
                    exchange.CancelOrder(order_id)
                    Log("Short position order not fully filled, cancelled")

            return False

        except Exception as e:
            Log(f"Open short position error: {str(e)}")
            return False

    def close_position(self):
        """Close position"""
        try:
            positions = exchange.GetPosition()
            if not positions:
                Log("No position to close")
                self.position_state = self.POSITION_NONE
                self.open_price = 0
                return True

            ticker = exchange.GetTicker()
            if not ticker:
                return False

            close_success = True

            for pos in positions:
                if pos['Amount'] == 0:
                    continue

                amount = pos['Amount']
                pos_type = pos['Type']

                if pos_type == 0:  # Close long position
                    close_price = ticker['Last'] - 20
                    order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
                    Log(f"๐Ÿ“ค Close long position: Price={close_price}, Amount={amount}")

                elif pos_type == 1:  # Close short position
                    close_price = ticker['Last'] + 20
                    order_id = exchange.CreateOrder("", "closesell", close_price, amount)
                    Log(f"๐Ÿ“ค Close short position: Price={close_price}, Amount={amount}")

                if order_id:
                    Sleep(2000)
                    order_info = exchange.GetOrder(order_id)
                    if order_info and order_info.get('Status') == 1:
                        close_price = order_info.get('AvgPrice', close_price)
                        Log(f"โœ… Position closed successfully: Fill price={close_price}")
                        self.update_profit_stats(close_price)
                    else:
                        exchange.CancelOrder(order_id)
                        close_success = False
                        Log(f"Close position order not fully filled, cancelled")
                else:
                    close_success = False
                    Log("Failed to create close position order")

            if close_success:
                self.position_state = self.POSITION_NONE
                self.open_price = 0

            return close_success

        except Exception as e:
            Log(f"Close position error: {str(e)}")
            return False

    def update_profit_stats(self, close_price):
        """Update profit/loss statistics"""
        if self.open_price == 0:
            return

        if self.position_state == self.POSITION_LONG:
            if close_price > self.open_price:
                self.counter['win'] += 1
                Log("๐Ÿ’ฐ Long position profitable")
            else:
                self.counter['loss'] += 1
                Log("๐Ÿ’ธ Long position loss")
        elif self.position_state == self.POSITION_SHORT:
            if close_price < self.open_price:
                self.counter['win'] += 1
                Log("๐Ÿ’ฐ Short position profitable")
            else:
                self.counter['loss'] += 1
                Log("๐Ÿ’ธ Short position loss")

    def check_stop_loss_take_profit(self, current_price, params):
        """Check stop loss and take profit, execute closing"""
        if self.open_price == 0 or self.position_state == self.POSITION_NONE:
            return False

        stop_loss_pct = params["stop_loss"]
        take_profit_pct = params["take_profit"]

        if self.position_state == self.POSITION_LONG:
            profit_pct = (current_price - self.open_price) / self.open_price

            if profit_pct <= -stop_loss_pct:
                Log(f"๐Ÿ”ด Long position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}")
                return self.execute_close_position("Stop Loss")
            elif profit_pct >= take_profit_pct:
                Log(f"๐ŸŸข Long position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}")
                return self.execute_close_position("Take Profit")

        elif self.position_state == self.POSITION_SHORT:
            profit_pct = (self.open_price - current_price) / self.open_price

            if profit_pct <= -stop_loss_pct:
                Log(f"๐Ÿ”ด Short position stop loss triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Loss={profit_pct:.4f}")
                return self.execute_close_position("Stop Loss")
            elif profit_pct >= take_profit_pct:
                Log(f"๐ŸŸข Short position take profit triggered: Open price={self.open_price:.2f}, Current price={current_price:.2f}, Profit={profit_pct:.4f}")
                return self.execute_close_position("Take Profit")

        return False

    def execute_close_position(self, reason):
        """Execute close position operation (specifically for stop loss/take profit)"""
        try:
            positions = exchange.GetPosition()
            if not positions:
                Log(f"{reason} close position: No position")
                self.position_state = self.POSITION_NONE
                self.open_price = 0
                return True

            ticker = exchange.GetTicker()
            if not ticker:
                Log(f"{reason} close position failed: Cannot get ticker")
                return False

            Log(f"๐Ÿšจ Executing {reason} close position operation...")
            close_success = True

            for pos in positions:
                if pos['Amount'] == 0:
                    continue

                amount = pos['Amount']
                pos_type = pos['Type']
                order_id = None

                if pos_type == 0:  # Close long position
                    close_price = ticker['Last'] - 50
                    order_id = exchange.CreateOrder("", "closebuy", close_price, amount)
                    Log(f"๐Ÿ“ค {reason} close long position order: Price={close_price}, Amount={amount}")

                elif pos_type == 1:  # Close short position
                    close_price = ticker['Last'] + 50
                    order_id = exchange.CreateOrder("", "closesell", close_price, amount)
                    Log(f"๐Ÿ“ค {reason} close short position order: Price={close_price}, Amount={amount}")

                if order_id:
                    Log(f"๐Ÿ“‹ {reason} close position order ID: {order_id}")
                    Sleep(1500)

                    for retry in range(2):
                        order_info = exchange.GetOrder(order_id)
                        if order_info:
                            status = order_info.get('Status', -1)
                            if status == 1:
                                close_price = order_info.get('AvgPrice', close_price)
                                Log(f"โœ… {reason} close position successful: Fill price={close_price}")
                                self.update_profit_stats(close_price)
                                break
                            elif status == 0:
                                if retry == 0:
                                    Log(f"โณ {reason} close position order executing, waiting...")
                                    Sleep(1500)
                                else:
                                    Log(f"โš ๏ธ {reason} close position order not fully filled, force cancel")
                                    exchange.CancelOrder(order_id)
                                    close_success = False
                            else:
                                Log(f"โŒ {reason} close position order status abnormal: {status}")
                                exchange.CancelOrder(order_id)
                                close_success = False
                                break
                        else:
                            Log(f"โš ๏ธ Cannot get {reason} close position order info, retry {retry+1}/2")
                            if retry == 1:
                                close_success = False
                else:
                    Log(f"โŒ {reason} close position order creation failed")
                    close_success = False

            if close_success:
                Sleep(1000)
                new_positions = exchange.GetPosition()
                total_amount = sum(pos['Amount'] for pos in new_positions) if new_positions else 0

                if total_amount == 0:
                    Log(f"โœ… {reason} close position completed, position cleared")
                    self.position_state = self.POSITION_NONE
                    self.open_price = 0
                    return True
                else:
                    Log(f"โš ๏ธ {reason} close position incomplete, remaining position: {total_amount}")
                    return False
            else:
                Log(f"โŒ {reason} close position failed")
                return False

        except Exception as e:
            Log(f"โŒ {reason} close position error: {str(e)}")
            return False

    def execute_trade_logic(self, score, market_type, current_price):
        """Execute trading logic"""
        params = self.param_manager.get_params(market_type)

        # Get current actual position status
        actual_position, position_amount = self.get_current_position()

        # Synchronize internal state
        self.position_state = actual_position

        # First check stop loss/take profit (highest priority)
        if self.position_state != self.POSITION_NONE:
            if self.check_stop_loss_take_profit(current_price, params):
                Log("๐Ÿšจ Stop loss/take profit triggered, executed close position, skipping other trading signals")
                return

        # Get entry conditions
        long_condition, short_condition = self.check_entry_conditions(score, market_type)

        # Execute trading logic
        if long_condition and self.position_state <= self.POSITION_NONE:
            Log(f"๐Ÿ“ˆ Long position signal: Market={market_type}, Prediction score={score:.1f} > 65")
            self.open_long()

        if short_condition and self.position_state >= self.POSITION_NONE:
            Log(f"๐Ÿ“‰ Short position signal: Market={market_type}, Prediction score={score:.1f} < 35")
            self.open_short()

        if not long_condition and self.position_state > self.POSITION_NONE:
            Log(f"๐Ÿ“ค Close long position signal: Market={market_type}, Prediction score={score:.1f}")
            self.close_position()

        if not short_condition and self.position_state < self.POSITION_NONE:
            Log(f"๐Ÿ“ค Close short position signal: Market={market_type}, Prediction score={score:.1f}")
            self.close_position()

def CancelPendingOrders():
    """Cancel all pending orders"""
    while True:
        orders = exchange.GetOrders()
        if not orders:
            break
        for order in orders:
            exchange.CancelOrder(order['Id'])
            Sleep(500)

def main():
    global AmountOP, LoopInterval

    # Check initial position
    initial_positions = exchange.GetPosition()
    if initial_positions and any(pos['Amount'] > 0 for pos in initial_positions):
        raise Error_AtBeginHasPosition()

    # Cancel all pending orders
    CancelPendingOrders()

    # Initialize strategy
    strategy = PredictiveNeuralTradingStrategy()

    Log("๐Ÿ”ฎ Predictive Neural Network Futures Trading Strategy Started")
    LogProfitReset()

    # Data warm-up period
    Log("Entering data warm-up period...")
    warmup_count = 0
    warmup_target = 60

    while warmup_count < warmup_target:
        records = exchange.GetRecords()
        if records and len(records) >= 55:
            if strategy.collect_data(records):
                warmup_count += 1
                if warmup_count % 10 == 0:
                    Log(f"Warm-up progress: {warmup_count}/{warmup_target}")
        Sleep(5000)

    Log("Data warm-up completed, starting initial return rate predictor training...")
    strategy.train_model()

    # Main trading loop
    loop_count = 0
    while True:
        loop_count += 1

        # Get K-line data
        records = exchange.GetRecords()
        if not records or len(records) < 55:
            Sleep(LoopInterval * 1000)
            continue

        # Data processing
        data_updated = strategy.collect_data(records)

        # Check if retraining is needed
        if strategy.should_retrain():
            Log("๐Ÿ”„ Retraining return rate predictor...")
            strategy.train_model()

        # Get trading signals
        score, market_type = strategy.get_trading_signals(records)

        # Get current real-time price
        ticker = exchange.GetTicker()
        if ticker:
            current_price = ticker['Last']
        else:
            current_price = records[-1]['Close']

        # Get current parameters
        params = strategy.param_manager.get_params(market_type)

        # Priority check for stop loss/take profit (using real-time price)
        if strategy.position_state != strategy.POSITION_NONE:
            if strategy.check_stop_loss_take_profit(current_price, params):
                Log("โšก Stop loss/take profit triggered, executed close position")
                Sleep(LoopInterval * 1000)
                continue

        # Execute trading logic (only when there's new data)
        if data_updated:
            strategy.execute_trade_logic(score, market_type, current_price)

        # Status display
        pos_state_name = {
            strategy.POSITION_NONE: "No Position",
            strategy.POSITION_LONG: "Long", 
            strategy.POSITION_SHORT: "Short"
        }.get(strategy.position_state, "Unknown")

        data_status = "๐Ÿ“ŠNew Data" if data_updated else "โธ๏ธWaiting"
        model_status = "๐Ÿ”ฎPrediction" if strategy.scoring_system.is_model_trained else "๐Ÿ“ŠBasic"

        # Get entry conditions for display
        long_cond, short_cond = strategy.check_entry_conditions(score, market_type)
        signal_status = ""
        if long_cond:
            signal_status = "๐Ÿ“ˆLong"
        elif short_cond:
            signal_status = "๐Ÿ“‰Short"
        else:
            signal_status = "๐Ÿ”„Hold"

        # Display training sample count
        sample_count = len(strategy.feature_buffer)

        LogStatus(f"Loop: {loop_count}, Price: {current_price:.2f}, "
                  f"Prediction Score: {score:.1f}, Market: {market_type}, "
                  f"Position: {pos_state_name}, Signal: {signal_status}, "
                  f"Status: {data_status}, Mode: {model_status}, "
                  f"Samples: {sample_count}, "
                  f"Win: {strategy.counter['win']}, Loss: {strategy.counter['loss']}")

        Sleep(LoopInterval * 1000)

# ========== Parameter Settings ==========
AmountOP = 1  # Futures contract quantity
LoopInterval = 3  # Loop interval (seconds)

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)