DEV Community

Xiao Ling
Xiao Ling

Posted on • Originally published at dynamsoft.com

Python Development: Real-Time 1D and 2D Barcode Scanning from an IP Camera

In today's rapidly evolving digital landscape, IP camera systems have become the backbone of numerous industries requiring real-time monitoring, automation, and data capture. In this article, we will explore how to turn a USB camera into an IP camera for remote monitoring, as well as how to implement a GUI application to scan 1D/2D barcodes from IP camera using Python, PySide6, and Dynamsoft Barcode Reader.

Demo Video: IP Camera Barcode Scan

Prerequisites

Project Overview

The project contains two sub-projects:

  • IP camera server that streams video from a USB camera over the network. requirements.txt:

    Flask==2.3.3
    opencv-python==4.8.1.78
    numpy>=1.21.0,<2.0.0
    
  • GUI client that receives the stream and scans barcodes. requirements.txt:

    PySide6==6.7.2
    requests==2.31.0
    opencv-python==4.8.1.78
    numpy>=1.21.0,<2.0.0
    dynamsoft-capture-vision-bundle>=3.0.6000
    

Building the IP Camera Server

An IP (network) camera sends video over your LAN or the internet. A simple way to achieve this is by using HTTP MJPEG (a sequence of JPEG frames over HTTP).

Core Server Architecture

class IPCameraServer:
    def __init__(self, camera_index=0, host='0.0.0.0', port=5000, 
                 resolution=(640, 480), fps=30):
        self.camera_index = camera_index
        self.host = host
        self.port = port
        self.resolution = resolution
        self.fps = fps

        self.app = Flask(__name__)
        self.camera = None
        self.frame = None
        self.lock = threading.Lock()
        self.running = False
Enter fullscreen mode Exit fullscreen mode

Camera Initialization and Management

The server implements camera initialization with OpenCV:

def initialize_camera(self):
    """Initialize USB camera with comprehensive error handling"""
    try:
        logging.info(f"Initializing camera {self.camera_index}...")
        self.camera = cv2.VideoCapture(self.camera_index)

        if not self.camera.isOpened():
            logging.error(f"Failed to open camera {self.camera_index}")
            return False

        self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
        self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
        self.camera.set(cv2.CAP_PROP_FPS, self.fps)

        ret, frame = self.camera.read()
        if not ret or frame is None:
            logging.error("Failed to capture test frame")
            return False

        logging.info(f"Camera initialized: {self.resolution[0]}x{self.resolution[1]} @ {self.fps}fps")
        return True

    except Exception as e:
        logging.error(f"Camera initialization error: {e}")
        return False
Enter fullscreen mode Exit fullscreen mode

Multi-threaded Frame Capture

For real-time video processing, the server uses a dedicated thread for continuous frame capture:

def capture_frames(self):
    """Capture frames in separate thread for optimal performance"""
    logging.info("Starting frame capture thread...")

    while self.running:
        try:
            if self.camera is None or not self.camera.isOpened():
                logging.warning("Camera unavailable, attempting reconnection...")
                if not self.initialize_camera():
                    time.sleep(5) 
                    continue

            ret, frame = self.camera.read()
            if not ret or frame is None:
                logging.warning("Failed to capture frame, retrying...")
                time.sleep(0.1)
                continue

            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            cv2.putText(frame, timestamp, (10, 30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

            with self.lock:
                self.frame = frame.copy()
                self.frame_count += 1

            time.sleep(1.0 / self.fps)

        except Exception as e:
            logging.error(f"Frame capture error: {e}")
            time.sleep(1)
Enter fullscreen mode Exit fullscreen mode

MJPEG Streaming Implementation

Encode frame as JPEG with optimized quality and send over HTTP:

def generate_frames(self):
    """Generate MJPEG frames for HTTP streaming"""
    while self.running:
        try:
            with self.lock:
                if self.frame is not None:
                    frame = self.frame.copy()
                else:
                    continue

            ret, buffer = cv2.imencode('.jpg', frame, 
                                     [cv2.IMWRITE_JPEG_QUALITY, 85])
            if not ret:
                continue

            frame_bytes = buffer.tobytes()

            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + 
                   frame_bytes + b'\r\n')

        except Exception as e:
            logging.error(f"Frame generation error: {e}")
            time.sleep(0.1)
Enter fullscreen mode Exit fullscreen mode

Flask Routes and Web Interface

The server exposes multiple endpoints for different use cases:

def setup_routes(self):
    """Setup Flask routes for web interface and streaming"""

    @self.app.route('/')
    def index():
        """Main web interface with live video display"""
        return render_template_string(
            self.get_html_template(),
            resolution=self.resolution,
            host=self.host, 
            port=self.port,
            local_ip=self.get_local_ip()
        )

    @self.app.route('/video_feed')
    def video_feed():
        """MJPEG streaming endpoint - core functionality"""
        return Response(
            self.generate_frames(),
            mimetype='multipart/x-mixed-replace; boundary=frame'
        )

    @self.app.route('/status')
    def status():
        """JSON API endpoint for camera statistics"""
        current_time = time.time()
        uptime = int(current_time - self.start_time)
        current_fps = self.frame_count / max(1, uptime)

        return jsonify({
            'status': 'online' if self.running else 'offline',
            'fps': f"{current_fps:.1f}",
            'frames': self.frame_count,
            'uptime': uptime,
            'resolution': f"{self.resolution[0]}x{self.resolution[1]}",
            'timestamp': datetime.now().isoformat()
        })
Enter fullscreen mode Exit fullscreen mode

Network Configuration and Auto-Discovery

The server automatically detects the local IP address for easy network access:

def get_local_ip(self):
    """Automatically detect local IP address for network access"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except Exception:
        return "127.0.0.1"
Enter fullscreen mode Exit fullscreen mode

Server Startup and Management

Start up the server and handle any initialization errors:

def start(self):
    """Start IP camera server with full error handling"""
    try:
        logging.info("Starting USB IP Camera Server...")

        if not self.initialize_camera():
            logging.error("Camera initialization failed")
            return False

        self.running = True
        self.start_time = time.time()

        capture_thread = threading.Thread(
            target=self.capture_frames, daemon=True
        )
        capture_thread.start()

        local_ip = self.get_local_ip()
        logging.info(f"Server starting on {self.host}:{self.port}")
        logging.info(f"Web interface: http://{local_ip}:{self.port}")
        logging.info(f"Stream URL: http://{local_ip}:{self.port}/video_feed")

        self.app.run(
            host=self.host, 
            port=self.port,
            debug=False, 
            threaded=True, 
            use_reloader=False
        )

    except KeyboardInterrupt:
        logging.info("Shutdown signal received")
        self.stop()
    except Exception as e:
        logging.error(f"Server error: {e}")
        self.stop()
        return False
Enter fullscreen mode Exit fullscreen mode

Usage Example

Starting the IP camera server is straightforward:

if __name__ == "__main__":
    server = IPCameraServer(
        camera_index=0,        
        host='0.0.0.0',        
        port=5000,             
        resolution=(640, 480), 
        fps=30                 
    )

    server.start()
Enter fullscreen mode Exit fullscreen mode

Once running, the server provides:

  • Web Interface: http://localhost:5000
  • Stream Endpoint: http://localhost:5000/video_feed
  • Status API: http://localhost:5000/status IP camera server

Building a Python GUI App for IP Camera Streaming and Barcode Scanning

To scan barcodes from the video stream:

  1. Construct the GUI layout using PySide6.
  2. Capture frames from the video feed using OpenCV.
  3. Process each frame with Dynamsoft Barcode Reader.
  4. Display the video feed and draw detected barcode results.

Camera URL to OpenCV Mat

The OpenCV VideoCapture() function can be used to continuously retrieve frames from the IP camera stream:

def connect_to_stream(self, url: str, protocol: str = "http"):
    """Connect to IP camera stream and initialize OpenCV capture"""
    self.cap = cv2.VideoCapture(url)
    if not self.cap.isOpened():
        raise ConnectionError("Failed to connect to camera stream")

def read_frames(self):
    """Continuously read frames from the video stream"""
    while self.cap.isOpened():
        ret, frame = self.cap.read()
        if not ret:
            break
        self.process_frame(frame)
Enter fullscreen mode Exit fullscreen mode

OpenCV to Dynamsoft Barcode Reader Integration

  1. OpenCV Mat objects use BGR color format, while Dynamsoft Barcode Reader expects specific image data structures. Create a utility function to handle the conversion:

    def convertMat2ImageData(mat):
        """Convert OpenCV Mat to Dynamsoft ImageData format"""
        if len(mat.shape) == 3:
            height, width, channels = mat.shape
            pixel_format = EnumImagePixelFormat.IPF_RGB_888
        else:
            height, width = mat.shape
            channels = 1
            pixel_format = EnumImagePixelFormat.IPF_GRAYSCALED
    
        stride = width * channels
        imagedata = ImageData(mat.tobytes(), width, height, stride, pixel_format)
        return imagedata
    
  2. Create a custom image source adapter to append the converted image data to the barcode reader:

    class BarcodeScanner(QObject):
        """Main barcode scanning engine with Dynamsoft integration"""
    
        def __init__(self):
            self.cvr = CaptureVisionRouter()
            self.fetcher = FrameFetcher()
            self.receiver = BarcodeResultReceiver(self.result_queue)
    
            self.cvr.set_input(self.fetcher)
            self.cvr.add_result_receiver(self.receiver)
    
    class FrameFetcher(ImageSourceAdapter):
        """Custom frame fetcher for Dynamsoft Capture Vision"""
    
        def add_frame(self, image_data):
            """Add OpenCV frame to Dynamsoft processing queue"""
            self.add_image_to_buffer(image_data)
            self._has_frame = True
    
    class BarcodeResultReceiver(CapturedResultReceiver):
        """Custom result receiver for barcode detection results"""
    
        def __init__(self, result_queue):
            super().__init__()
            self.result_queue = result_queue
    
        def on_captured_result_received(self, result):
            """Called when barcode detection results are available"""
            try:
                self.result_queue.put_nowait(result)
            except queue.Full:
                try:
                    self.result_queue.get_nowait()
                    self.result_queue.put_nowait(result)
                except queue.Empty:
                    pass
    

Real-Time Barcode Processing and Overlay Drawing

  1. Detect barcodes in real-time from the IP camera stream:

    def process_frame(self, mat):
        """Process OpenCV frame for barcode detection"""
        if not self.is_scanning:
            return
    
        try:
            image_data = convertMat2ImageData(mat)
    
            self.fetcher.add_frame(image_data)
    
        except Exception as e:
            pass
    
  2. Draw barcode detection results as overlays on the video stream:

    def _draw_barcode_overlay_on_mat(self, mat):
        """Draw barcode overlays directly on OpenCV Mat"""
        fresh_barcodes = self.barcode_scanner.get_fresh_results()
    
        for barcode in fresh_barcodes:
            points = barcode['points']
            text = barcode['text']
    
            pts = np.array(points, dtype=np.int32)
            cv2.drawContours(mat, [pts], 0, (0, 255, 0), 3)
    
            if points:
                text_x, text_y = points[0]
                cv2.putText(mat, text, (text_x, text_y - 10), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
    

    IP camera barcode scanner

Source Code

https://github.com/yushulx/python-barcode-qrcode-sdk/tree/main/examples/official/ip_camera

Top comments (0)