twitterapi.io is an independent third-party service. Not affiliated with X Corp. "Twitter" and "X" are trademarks of X Corp.Not affiliated with X Corp.
Fetching complete historical Twitter/X data looks simple until you actually do it. In April 2026 we pulled 43,769 tweets for a single cashtag spanning 2019 to today — but only after working around two API regressions. This guide documents the verified strategy and ships two production Python scripts.
cursor, causing infinite loops. Our verified fix: don't use cursor for historical scraping. Use time-window sliding instead.since:YYYY-MM-DD and until:YYYY-MM-DD filters no longer work. Use the Unix-timestamp variants since_time:<unix> and until_time:<unix> in seconds.Everything below is updated for these changes. Older tutorials (including the earlier version of this page) will not produce complete results.
Below is a side-by-side of the old recipe most blog posts still recommend and the verified-working 2026 approach. If you're porting existing code, these three lines are the diff:
# cursor + max_id pagination
params["cursor"] = cursor
params["query"] = f"{q} max_id:{last_id}"
# date-based filters
q = "$AAPL since:2023-01-01 until:2024-01-01"
# single thread, sequential# time-window sliding (no cursor)
q = f"$AAPL since_time:{since_ts} until_time:{current_until}"
# next iteration: current_until = earliest - 1
# Unix timestamp filters
since_ts = 1672531200 # = 2023-01-01 UTC
# parallel: 10-15 threads per day/hourcursor + max_id approach failsWhen your query reaches tweets from roughly 2019–2022, the API occasionally returns the same tweets again but with a different cursor value. Your paginator thinks it's on a new page and keeps going forever.
| Scenario | Cursor behavior |
|---|---|
| Recent data (2025–2026) | Stable, reproducible |
| Historical data (2019–2022) | Returns duplicates with shifting cursor → infinite loop |
Combined with since_time | Triggers the bug even more often |
Takeaway: don't rely on cursor for historical scraping. It still works for short recent queries, so you'll see tutorials recommending it — they just weren't written against older data.
since:YYYY-MM-DD filter no longer matchesOld search syntax like since:2023-01-01 until:2024-01-01 is no longer honored by the underlying index. Queries may return unexpectedly empty results or ignore the bound entirely.
Switch to Unix-timestamp (seconds) variants: since_time:<unix_seconds> for the lower bound and until_time:<unix_seconds> for the upper bound. Result is exclusive of until_time, inclusive of since_time (use earliest - 1 as next until_time to avoid overlap).
until_time sliding hits gapsIf you skip the cursor entirely and just slide until_time backwards from "now" to 2019, you'll find the API returns empty results for some timestamp windows that actually contain tweets. Skipping 1 day forward, data reappears.
This is an index-layer bug, not a client mistake. Never stop on an empty response. If your code does if not tweets: break, you lose entire days of data.
Our fix: bound each query to a fixed day (or hour) window. If one window is genuinely empty, you skip one day — not the rest of history.
Four primitives, combined, give you full coverage at high throughput without infinite loops:
Split the full time range into small windows. Each window becomes an independent unit of work — bounded scope, isolated failures.
until_time inside each windowWithin a window, start from the upper bound. Each API call returns at most 20 tweets; take the earliest timestamp, subtract 1 second, use that as the next until_time. Repeat until fewer than 20 come back.
Since each window is independent, run many at once. We've found 10–15 threads optimal — higher risks hitting rate limits without meaningful throughput gain.
Near window boundaries, and on occasional retries, the API returns the same tweet twice. A shared seen_ids set across threads handles it.
| Strategy | Verdict |
|---|---|
| Whole range + cursor | Infinite loop on historical |
Whole range + pure until_time | Loses data at gap windows, can't parallelize |
| Per-day + cursor | Still triggers cursor bug |
Per-day + until_time slide | Stable, parallelizable, complete |
-filter:retweets in the query doesn't always exclude them. Do a second check in code via the isRetweet field. Our reference code does this.Most of Twitter's advanced search operators still work. The notable change is time bounds:
| Purpose | Syntax | Status |
|---|---|---|
| Lower time bound | since_time:1672531200 | ✓ Works |
| Upper time bound | until_time:1704067200 | ✓ Works |
| Legacy date bound | since:2023-01-01 | ✗ Not supported |
| Legacy date bound | until:2024-01-01 | ✗ Not supported |
| From specific user | from:elonmusk | ✓ Works |
| Skip replies | -filter:replies | ✓ Works |
| Skip retweets | -filter:retweets | ⚠ Partial — verify in code with isRetweet |
| Minimum likes | min_faves:100 | ✓ Works |
For the full operator list, see igorbrigadir/twitter-advanced-search — most still apply. Just swap since: / until: for the _time variants.
Each API call caps at 20 tweets, so a window producing 1000 tweets needs ~50 calls. Pick your shard size so each call count stays manageable:
| Daily tweet volume | Recommended shard | Typical calls / shard |
|---|---|---|
| < 50 | 1 day | 1–3 |
| 50–500 | 1 hour | 1–3 |
| > 500 | 15–30 min | 1–5 |
Unsure? Probe a single day first — take the resulting count, divide by the shard size you need. For perspective: $AMD on a busy day = ~660 tweets → hourly sharding → ~44 calls → ~12 seconds on 15 threads.
Full working implementation. Day-sharding + per-day until_time slide + 15 threads + checkpoint resume + dual-file storage. Used to pull 43k+ tweets for $AA across 2019–2026.
"""
Parallel historical tweet scraper
Strategy: day sharding + per-day until_time sliding + 15 threads
"""
import requests
import json
import time
import threading
from datetime import datetime, timezone, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
# === Config ===
API_URL = "https://api.twitterapi.io/twitter/tweet/advanced_search"
API_KEY = "YOUR_API_KEY"
KEYWORD = "$AA" # Search keyword / cashtag / handle
QUERY_FILTERS = "-filter:replies" # Skip replies, keep originals + quotes
# Time range (UTC)
START_DATE = datetime(2019, 1, 1, tzinfo=timezone.utc)
END_DATE = datetime(2026, 4, 20, tzinfo=timezone.utc)
WORKERS = 15
OUTPUT_FILE = "tweets.jsonl"
RAW_FILE = "tweets_raw.jsonl"
PROGRESS_FILE = "progress.json"
# Thread-safety
write_lock = threading.Lock()
stats_lock = threading.Lock()
seen_ids = set()
seen_ids_lock = threading.Lock()
stats = {
"total": 0, "api_calls": 0, "days_done": 0, "days_total": 0,
"duplicates": 0, "retweet_filtered": 0, "quotes": 0,
"originals": 0, "errors": 0, "empty_days": 0,
}
def parse_twitter_time(s: str) -> int:
return int(datetime.strptime(s, "%a %b %d %H:%M:%S %z %Y").timestamp())
def standardize_tweet(t: dict) -> dict:
a = t.get("author", {})
e = t.get("entities", {})
return {
"tweet_id": t.get("id"),
"created_at": t.get("createdAt"),
"text_raw": t.get("text"),
"language": t.get("lang"),
"is_reply": t.get("isReply", False),
"is_quote": t.get("quoted_tweet") is not None,
"is_retweet": t.get("isRetweet", False),
"hashtags": [h.get("text", "") for h in e.get("hashtags", [])],
"cashtags": [s.get("text", "") for s in e.get("symbols", [])],
"mentions": [m.get("screen_name", "") for m in e.get("user_mentions", [])],
"urls": [u.get("expanded_url") or u.get("url", "") for u in e.get("urls", [])],
"likes": t.get("likeCount", 0),
"retweets": t.get("retweetCount", 0),
"replies": t.get("replyCount", 0),
"impressions": t.get("viewCount", 0),
"author": {
"id": a.get("id"), "username": a.get("userName"), "name": a.get("name"),
"followers": a.get("followers", 0), "following": a.get("following", 0),
"verified": a.get("isBlueVerified", False),
"created_at": a.get("createdAt"),
},
}
def api_get(params, retries=3):
headers = {"x-api-key": API_KEY}
for i in range(retries):
try:
r = requests.get(API_URL, headers=headers, params=params, timeout=60)
r.raise_for_status()
return r.json()
except Exception:
if i < retries - 1:
time.sleep(1)
else:
with stats_lock:
stats["errors"] += 1
return {"tweets": []}
def fetch_day(since_ts: int, until_ts: int, date_str: str, f_out, f_raw):
"""
Fetch all tweets in [since_ts, until_ts] using until_time sliding.
Each call returns at most 20; use earliest-1 as next until_time.
"""
day_calls = 0
day_new = 0
current_until = until_ts
max_calls = 50 # safety cap
while current_until > since_ts and day_calls < max_calls:
query = f"{KEYWORD} since_time:{since_ts} until_time:{current_until} {QUERY_FILTERS}"
data = api_get({"queryType": "Latest", "query": query})
tweets = data.get("tweets", [])
day_calls += 1
if not tweets:
break
# Filter retweets in code (API flag isn't 100% reliable)
valid = [t for t in tweets if not t.get("isRetweet", False)]
with stats_lock:
stats["retweet_filtered"] += len(tweets) - len(valid)
batch_std, batch_raw = [], []
for t in valid:
tid = t.get("id")
if not tid:
continue
with seen_ids_lock:
if tid in seen_ids:
with stats_lock:
stats["duplicates"] += 1
continue
seen_ids.add(tid)
batch_std.append(json.dumps(standardize_tweet(t), ensure_ascii=False) + "\n")
batch_raw.append(json.dumps(t, ensure_ascii=False) + "\n")
day_new += 1
with stats_lock:
if t.get("quoted_tweet"):
stats["quotes"] += 1
else:
stats["originals"] += 1
if batch_std:
with write_lock:
f_out.writelines(batch_std)
f_raw.writelines(batch_raw)
# Slide: next until = earliest in this batch - 1 second
earliest = min(parse_twitter_time(t["createdAt"]) for t in tweets)
if earliest < current_until:
current_until = earliest - 1
else:
break # guard against infinite loop
# Fewer than 20 = this window is exhausted
if len(tweets) < 20:
break
with stats_lock:
stats["total"] += day_new
stats["api_calls"] += day_calls
stats["days_done"] += 1
if day_new == 0:
stats["empty_days"] += 1
return date_str, day_new, day_calls
def generate_days():
days = []
cur = START_DATE
while cur < END_DATE:
nxt = cur + timedelta(days=1)
days.append((int(cur.timestamp()), int(nxt.timestamp()), cur.strftime("%Y-%m-%d")))
cur = nxt
return days
def load_progress():
try:
with open(PROGRESS_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def save_progress(done_days):
with open(PROGRESS_FILE, "w") as f:
json.dump({
"completed_days": done_days,
"total": stats["total"],
"api_calls": stats["api_calls"],
"updated_at": datetime.now(timezone.utc).isoformat(),
}, f, indent=2)
def main():
all_days = generate_days()
stats["days_total"] = len(all_days)
# Resume support
progress = load_progress()
completed = set()
if progress:
completed = set(progress.get("completed_days", []))
stats["days_done"] = len(completed)
try:
with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
for line in f:
try:
seen_ids.add(json.loads(line).get("tweet_id"))
except json.JSONDecodeError:
pass
stats["total"] = len(seen_ids)
except FileNotFoundError:
pass
mode = "a"
else:
mode = "w"
pending = [(s, u, d) for s, u, d in all_days if d not in completed]
print(f"Days: {stats['days_total']} total, {len(pending)} pending, {WORKERS} workers")
f_out = open(OUTPUT_FILE, mode, encoding="utf-8")
f_raw = open(RAW_FILE, mode, encoding="utf-8")
start = time.time()
done_list = list(completed)
try:
with ThreadPoolExecutor(max_workers=WORKERS) as ex:
futures = {ex.submit(fetch_day, s, u, d, f_out, f_raw): d for s, u, d in pending}
for fut in as_completed(futures):
d, n, c = fut.result()
done_list.append(d)
elapsed = time.time() - start
print(f"[{d}] +{n} ({c} calls) | {stats['days_done']}/{stats['days_total']} | total={stats['total']} | {elapsed:.0f}s", flush=True)
if stats["days_done"] % 100 == 0:
f_out.flush(); f_raw.flush(); save_progress(done_list)
except KeyboardInterrupt:
print("Interrupted by user.")
finally:
f_out.close(); f_raw.close(); save_progress(done_list)
elapsed = time.time() - start
print(f"\n=== DONE === total={stats['total']} api_calls={stats['api_calls']} elapsed={elapsed:.0f}s")
if __name__ == "__main__":
main()
Dependencies: pip install requests. Standard-library everything else.
For busy keywords (breaking news, active cashtags like $AMD), day-level shards get too big. This script does hour-level sharding, same slide + parallel pattern. Completes a 663-tweet day in ~12 seconds.
"""
Single-day hot-keyword scraper
Strategy: hour sharding + per-hour until_time sliding + 15 threads
Best for keywords with high volume (e.g. $AMD, breaking news terms)
"""
import requests, json, time, threading
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
API_URL = "https://api.twitterapi.io/twitter/tweet/advanced_search"
API_KEY = "YOUR_API_KEY"
KEYWORD = "$AMD"
QUERY_FILTERS = "-filter:replies"
DAY_START = int(datetime(2026, 4, 17, 0, 0, 0, tzinfo=timezone.utc).timestamp())
DAY_END = int(datetime(2026, 4, 18, 0, 0, 0, tzinfo=timezone.utc).timestamp())
WORKERS = 15
OUTPUT_FILE = "amd_0417.jsonl"
write_lock = threading.Lock()
stats_lock = threading.Lock()
seen_ids = set()
seen_ids_lock = threading.Lock()
stats = {"total": 0, "api_calls": 0, "slots_done": 0, "duplicates": 0}
def parse_time(s):
return int(datetime.strptime(s, "%a %b %d %H:%M:%S %z %Y").timestamp())
def api_get(params, retries=3):
h = {"x-api-key": API_KEY}
for i in range(retries):
try:
r = requests.get(API_URL, headers=h, params=params, timeout=60)
r.raise_for_status()
return r.json()
except Exception:
if i < retries - 1:
time.sleep(1)
return {"tweets": []}
def fetch_slot(since_ts, until_ts, label, f_out):
calls, added = 0, 0
current_until = until_ts
while current_until > since_ts and calls < 200:
q = f"{KEYWORD} since_time:{since_ts} until_time:{current_until} {QUERY_FILTERS}"
data = api_get({"queryType": "Latest", "query": q})
tweets = data.get("tweets", [])
calls += 1
if not tweets:
break
batch = []
for t in tweets:
if t.get("isRetweet", False):
continue
tid = t.get("id")
if not tid:
continue
with seen_ids_lock:
if tid in seen_ids:
with stats_lock:
stats["duplicates"] += 1
continue
seen_ids.add(tid)
batch.append(json.dumps(t, ensure_ascii=False) + "\n")
added += 1
if batch:
with write_lock:
f_out.writelines(batch)
earliest = min(parse_time(t["createdAt"]) for t in tweets)
if earliest < current_until:
current_until = earliest - 1
else:
break
if len(tweets) < 20:
break
with stats_lock:
stats["total"] += added
stats["api_calls"] += calls
stats["slots_done"] += 1
return label, added, calls
def main():
# 24 hourly slots
slots = [
(DAY_START + h * 3600, DAY_START + (h + 1) * 3600, f"{h:02d}:00")
for h in range(24)
]
with open(OUTPUT_FILE, "w", encoding="utf-8") as f_out:
start = time.time()
with ThreadPoolExecutor(max_workers=WORKERS) as ex:
futs = {ex.submit(fetch_slot, s, u, l, f_out): l for s, u, l in slots}
for fut in as_completed(futs):
lbl, n, c = fut.result()
print(f"[{lbl}] +{n} ({c} calls) | {stats['slots_done']}/24 | total={stats['total']}", flush=True)
print(f"\nDONE total={stats['total']} api_calls={stats['api_calls']} {time.time()-start:.1f}s")
if __name__ == "__main__":
main()
progress.json. On restart, skip them and continue from pending. Both reference scripts do this._raw.jsonl. Field additions / schema changes later can reprocess from raw without re-fetching.until_time is exclusive on the upper side. If you pass the exact timestamp of the earliest tweet as the next until_time, you may get the same tweet again (boundary overlap) or skip neighboring tweets at the same second. earliest - 1 avoids both.until_time — that's an infinite-loop guard.Sign in and copy your key from the dashboard.
Open dashboardCopy the script that fits your workload — day-sharding for broad historical ranges, hour-sharding for busy keywords.
Set KEYWORD, START_DATE, END_DATE, then python historical_scraper.py.
Start with the free tier — no credit card — and upgrade once you need the full 20 QPS. Transparent per-tweet pricing, no rate-limit games.