| import os |
| import streamlit as st |
| import requests |
| import cv2 |
| import numpy as np |
| from pathlib import Path |
| import subprocess |
| import shutil |
|
|
| |
| url = 'https://your-gradio-space-url/api/endpoint' |
|
|
| |
| write_token = os.getenv('WF') |
|
|
| |
| def make_request(data): |
| headers = { |
| 'Authorization': f'Bearer {write_token}', |
| |
| } |
| response = requests.post(url, json=data, headers=headers) |
| return response.json() |
|
|
| |
| video_file = st.file_uploader("Upload video", type=["mp4", "mov"]) |
| logo_file = st.file_uploader("Upload logo PNG", type=["png"]) |
| logo_size_option = st.selectbox("Logo size", [64, 128, 256]) |
|
|
| |
| shader_option = st.radio("Apply Shader?", ["Yes", "No"]) |
| shader_code = """ |
| def shader_frame(width, height, t): |
| x = np.linspace(0, 1, width) |
| y = np.linspace(0, 1, height) |
| xv, yv = np.meshgrid(x, y) |
| r = ((xv + 0.5*np.sin(t)) % 1.0 * 255).astype(np.uint8) |
| g = ((yv + 0.5*np.cos(t)) % 1.0 * 255).astype(np.uint8) |
| b = (0.5*255*np.ones_like(r)).astype(np.uint8) |
| frame = np.stack([b, g, r], axis=2) |
| return frame |
| """ |
| if shader_option == "Yes": |
| st.text_area("Current shader code", shader_code, height=200) |
|
|
| |
| video_path = Path("video.mp4") |
| if video_file: |
| video_path.write_bytes(video_file.getbuffer()) |
|
|
| status_placeholder = st.empty() |
|
|
| |
| def shader_frame(width, height, t): |
| x = np.linspace(0, 1, width) |
| y = np.linspace(0, 1, height) |
| xv, yv = np.meshgrid(x, y) |
| r = ((xv + 0.5*np.sin(t)) % 1.0 * 255).astype(np.uint8) |
| g = ((yv + 0.5*np.cos(t)) % 1.0 * 255).astype(np.uint8) |
| b = (0.5*255*np.ones_like(r)).astype(np.uint8) |
| frame = np.stack([b, g, r], axis=2) |
| return frame |
|
|
| |
| logo = None |
| has_alpha = False |
| if logo_file: |
| logo_data = np.frombuffer(logo_file.read(), np.uint8) |
| logo = cv2.imdecode(logo_data, cv2.IMREAD_UNCHANGED) |
| has_alpha = logo.shape[2] == 4 if logo.ndim == 3 else False |
| st.text(f"Logo loaded, transparency: {'Yes' if has_alpha else 'No'}") |
|
|
| |
| lh, lw = logo.shape[:2] |
| scale = logo_size_option / max(lw, lh) |
| new_w = int(lw * scale) |
| new_h = int(lh * scale) |
| logo = cv2.resize(logo, (new_w, new_h), interpolation=cv2.INTER_AREA) |
|
|
| |
| def overlay_logo(frame, logo, has_alpha): |
| height, width = frame.shape[:2] |
| lh, lw = logo.shape[:2] |
| y1, y2 = height - lh - 10, height - 10 |
| x1, x2 = width - lw - 10, width - 10 |
| if has_alpha: |
| alpha = logo[:, :, 3] / 255.0 |
| for c in range(3): |
| frame[y1:y2, x1:x2, c] = ( |
| alpha * logo[:, :, c] + (1 - alpha) * frame[y1:y2, x1:x2, c] |
| ) |
| else: |
| frame[y1:y2, x1:x2] = logo[:, :, :3] |
| return frame |
|
|
| |
| def has_nvenc(): |
| try: |
| result = subprocess.run(["ffmpeg", "-encoders"], capture_output=True, text=True) |
| return "h264_nvenc" in result.stdout |
| except Exception: |
| return False |
|
|
| |
| def generate_preview(): |
| if not video_file: |
| status_placeholder.warning("Upload a video first!") |
| return |
|
|
| cap = cv2.VideoCapture(str(video_path)) |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30 |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
| st.text(f"Preview resolution: {width} x {height} @ {fps:.1f} FPS") |
|
|
| tmp_out = Path("temp_frames.mp4") |
| out = cv2.VideoWriter(str(tmp_out), |
| cv2.VideoWriter_fourcc(*'mp4v'), |
| fps, (width, height)) |
|
|
| t = 0.0 |
| dt = 1.0 / fps |
| frame_idx = 0 |
|
|
| preview_display = st.empty() |
| progress_bar = st.progress(0) |
|
|
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| |
| if shader_option == "Yes": |
| shader = shader_frame(width, height, t) |
| blended = cv2.addWeighted(frame, 0.7, shader, 0.3, 0) |
| else: |
| blended = frame.copy() |
|
|
| |
| if logo is not None: |
| blended = overlay_logo(blended, logo, has_alpha) |
|
|
| out.write(blended) |
|
|
| |
| preview_small = cv2.resize(blended, (width // 2, height // 2)) |
| preview_display.image(preview_small, channels="BGR") |
|
|
| t += dt |
| frame_idx += 1 |
| progress_bar.progress(min(frame_idx / max(total_frames, 1), 1.0)) |
|
|
| cap.release() |
| out.release() |
|
|
| |
| output_path = Path("preview.mp4") |
| encoder = "h264_nvenc" if has_nvenc() else "libx264" |
| st.info(f"Encoding with: {encoder}") |
|
|
| cmd = [ |
| "ffmpeg", "-y", |
| "-i", str(tmp_out), |
| "-i", str(video_path), |
| "-map", "0:v:0", |
| "-map", "1:a:0", |
| "-c:v", encoder, |
| "-preset", "fast", |
| "-b:v", "5M", |
| "-c:a", "aac", |
| str(output_path) |
| ] |
|
|
| try: |
| subprocess.run(cmd, check=True) |
| st.success("✅ Preview generated successfully with audio!") |
| st.video(str(output_path)) |
| st.markdown(f"[📥 Download preview](./{output_path})") |
| except subprocess.CalledProcessError as e: |
| st.error("❌ FFmpeg failed to merge audio.") |
| st.text(e) |
|
|
| |
| if st.button("Generate Preview"): |
| generate_preview() |
|
|
| if st.button("Go Live"): |
| if not video_file: |
| st.warning("Upload a video first!") |
| else: |
| st.info("Starting live stream...") |
| |
| live_stream_url = "rtmp://your-rtmp-server/live/stream" |
| live_stream_cmd = [ |
| "ffmpeg", "-re", |
| "-i", str(video_path), |
| "-c:v", "libx264", |
| "-preset", "fast", |
| "-b:v", "5M", |
| "-c:a", "aac", |
| "-f", "flv", |
| live_stream_url |
| ] |
| try: |
| subprocess.Popen(live_stream_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| st.success("✅ Live stream started successfully!") |
| except subprocess.CalledProcessError as e: |
| st.error("❌ Failed to start live stream.") |
| st.text(e) |