import os
import time
import requests
from langchain.tools import tool
OUTX_API_KEY = os.environ["OUTX_API_KEY"]
BASE_URL = "https://api.outx.ai"
HEADERS = {"x-api-key": OUTX_API_KEY, "Content-Type": "application/json"}
@tool
def fetch_linkedin_profile(profile_slug: str) -> dict:
"""Fetch a LinkedIn profile by slug. Returns name, headline, location, experience."""
# Create task
resp = requests.post(
f"{BASE_URL}/linkedin-agent/fetch-profile",
headers=HEADERS,
json={"profile_slug": profile_slug},
)
task_id = resp.json()["api_agent_task_id"]
# Poll for result
for _ in range(30):
result = requests.get(
f"{BASE_URL}/linkedin-agent/get-task-status",
headers={"x-api-key": OUTX_API_KEY},
params={"api_agent_task_id": task_id},
).json()
if result["data"]["status"] == "completed":
return result["data"]["task_output"]
if result["data"]["status"] == "failed":
return {"error": "Task failed", "output": result["data"]["task_output"]}
time.sleep(5)
return {"error": "Task timed out"}
@tool
def get_watchlist_posts(watchlist_id: str, page: int = 1) -> dict:
"""Get recent LinkedIn posts from a watchlist. Returns post content, author, engagement metrics."""
resp = requests.get(
f"{BASE_URL}/api-posts",
headers={"x-api-key": OUTX_API_KEY},
params={
"watchlist_id": watchlist_id,
"sort_by": "recent_first",
"page": page,
},
)
return resp.json()
@tool
def comment_on_post(post_id: str, user_email: str, comment_text: str) -> dict:
"""Comment on a LinkedIn post from a watchlist. Requires post_id from get_watchlist_posts."""
resp = requests.post(
f"{BASE_URL}/api-comment",
headers=HEADERS,
json={
"post_id": post_id,
"user_email": user_email,
"comment_text": comment_text,
"actor_type": "user",
},
)
return resp.json()