Own-Programming-Language-Tu.../examples/network/twitch_tools.own

191 lines
6.7 KiB
Plaintext
Raw Normal View History

// Twitch Tools
use "std"
use "math"
use "http"
use "json"
use "date"
use "types"
use "functional"
match ARGS {
case (): usage()
case ("status", _): status(ARGS[1])
case ("liveinfo", _): liveInfo(ARGS[1])
case ("chat", _): chat(ARGS[1])
case ("list", "past", _): listPastBroadcasts(ARGS[2])
case ("m3u", "live", _): m3uPlaylistLive(ARGS[2])
case ("m3u", _): m3uPlaylist(ARGS[1])
case ("url", "live", _): urlPlaylistLive("source", ARGS[2])
case ("url", "live", _, _): urlPlaylistLive(ARGS[2], ARGS[3])
case ("url", _): urlPlaylist("source", ARGS[1])
case ("url", _, _): urlPlaylist(ARGS[1], ARGS[2])
case _: usage()
}
def usage() {
println "Usage: twitch_tools [mode] [args*]
mode:
status [channel] - prints channel status (online or offline)
liveinfo [channel] - prints live stream info
chat [vod id] - shows chat
list past [channel] - prints past broadcasts of channel
m3u [vod id] - gets m3u playlist for past broadcast
m3u live [channel] - gets m3u playlist for live channel
url [vod id] - gets url for past broadcast with source quality
url [quality] [vod id] - gets url for past broadcast with given quality
url live [channel] - gets url for live channel with source quality
url live [quality] [vod id] - gets playlist url for live channel with given quality
"
}
def status(channel) {
extract (status, ) = getStreamInfo(channel)
print status
}
def liveInfo(channel) {
extract (status, info) = getStreamInfo(channel)
if (status == "offline") {
println "This channel is offline"
return 0
}
stream = info.stream
channel = stream.channel
// Stream
print (arrayKeyExists("mature", channel) && channel.mature) ? "[NSFW] " : ""
println channel.status
println "Date: " + formatTzDate(stream.created_at)
println "Game: " + (arrayKeyExists("game", stream) ? stream.game : "unknown")
println "Viewers: " + stream.viewers
println sprintf("Language: %s, broadcaster: %s", channel.language, channel.broadcaster_language)
// Previews
preview = stream.preview
println "Previews:"
for resolution : ["small", "medium", "large"] {
if (!arrayKeyExists(resolution, preview)) continue
println sprintf("%8s %s", resolution, preview[resolution])
}
template = preview.template
def formatResolution(w, h) = replace(replace(template, "{width}", w), "{height}", h)
println sprintf("%8s %s", "HD", formatResolution(1280, 720))
println sprintf("%8s %s", "Full HD", formatResolution(1920, 1080))
println sprintf("height: %d, %d fps, %d delay", stream.video_height, int(round(stream.average_fps)), stream.delay)
}
def listPastBroadcasts(channel) {
url = sprintf("https://api.twitch.tv/kraken/channels/%s/videos?limit=100&broadcasts=true", channel)
pastVods = getJsonSync(url)
pastVods = pastVods.videos
for vod : pastVods {
println "=" * 30
println vod._id
if arrayKeyExists("title", vod) {
println vod.title
}
println "Date: " + formatTzDate(vod.recorded_at)
println "Game: " + (arrayKeyExists("game", vod) ? vod.game : "unknown")
println "Duration: " + lengthToTime(vod.length)
println "Views: " + vod.views
println "Previews:"
println vod.preview
if arrayKeyExists("animated_preview", vod) {
println vod.animated_preview
}
for quality : ["chunked", "high", "medium", "low", "mobile"] {
if (!arrayKeyExists(quality, vod.resolutions)
|| vod.resolutions[quality] == "0x0") continue
println sprintf("%s: %s %d fps", quality, vod.resolutions[quality], round(vod.fps[quality]))
}
}
}
def m3uPlaylist(vodId) {
print m3uPlaylistString(vodId)
}
def urlPlaylist(quality, vodId) {
print searchInPlaylist(quality, m3uPlaylistString(vodId))
}
def m3uPlaylistString(vodId) {
id = (indexOf(vodId, "v") == 0) ? substring(vodId, 1) : vodId
// Get token
url = sprintf("https://api.twitch.tv/api/vods/%s/access_token", id)
token = getJsonSync(url)
// Get playlist
url = sprintf("http://usher.twitch.tv/vod/%s?player=twitchweb&allow_source=true"
+ "&nauthsig=%s&nauth=%s", id, token.sig, urlencode(token.token))
return sync(def(ret) = http(url, ret))
}
def m3uPlaylistLive(channel) {
print m3uPlaylistLiveString(channel)
}
def urlPlaylistLive(quality, channel) {
print searchInPlaylist(quality, m3uPlaylistLiveString(channel))
}
def m3uPlaylistLiveString(channel) {
// Get token
url = sprintf("http://api.twitch.tv/api/channels/%s/access_token", channel)
token = getJsonSync(url)
// Get playlist
url = sprintf("http://usher.twitch.tv/api/channel/hls/%s.m3u8?player=twitchweb"
+ "&token=%s&sig=%s&allow_audio_only=true&allow_source=true&type=any&p=%d",
channel, token.token, token.sig, rand(1000000))
return sync(def(ret) = http(url, ret))
}
def chat(vodId) {
id = (indexOf(vodId, "v") == 0) ? substring(vodId, 1) : vodId
url = sprintf("https://api.twitch.tv/kraken/videos/v%s", id)
vodInfo = getJsonSync(url)
if (arrayKeyExists("error", vodInfo)) {
println "Error: " + vodInfo.message
return 0
}
recordDate = parseTzDate(vodInfo.recorded_at)
startTime = toTimestamp(recordDate) / 1000
startTime += (3 * 60 * 60)
endTime = long(startTime + ceil(vodInfo.length / 30.0) * 30)
maxIterations = (endTime - startTime) / 30
for i = 0, i < maxIterations, i++ {
timestamp = startTime + i * 30
url = sprintf("http://rechat.twitch.tv/rechat-messages?start=%d&video_id=v%s", timestamp, id)
chunksString = sync(def(ret) = http(url, ret))
chunksObject = jsondecode(chunksString)
for chunk : chunksObject.data {
attr = chunk.attributes
formattedDate = formatDate(newDate(attr.timestamp))
println sprintf("%s (%s)\n%s\n\n====\n",
attr.tags["display-name"], formattedDate, attr.message)
}
}
}
def searchInPlaylist(quality, m3u) {
quality = toLowerCase(quality)
lines = split(m3u, "\n")
found = false
for line : lines {
lower = toLowerCase(line)
if (contains(line, "#EXT-X-MEDIA") && contains(lower, quality)) {
found = true
} else if (found && contains(lower, "http")) {
return line
}
}
return ""
}
def getStreamInfo(channel) {
url = sprintf("https://api.twitch.tv/kraken/streams/%s", channel)
stream = getJsonSync(url)
return try(def() {
arrayKeyExists("_id", stream.stream)
return ["online", stream]
}, def(type, message) = ["offline", stream])
}
def contains(what, where) = indexOf(what, where) >= 0
def formatTzDate(str) = formatDate(parseTzDate(str), newFormat("yyyy-MM-dd HH:mm:ss"))
def parseTzDate(str) = parseDate(str, newFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"))
def lengthToTime(len) = sprintf("%02d:%02d:%02d", len / 3600, len / 60 % 60, len % 60)
def getJsonSync(url) = sync(def(ret) = http(url, def(r) = ret(jsondecode(r)) ))