// TODO : get header line
// TODO : get stream from response
// TODO : handle contiguous chunks (don't throw first line) : lineStarts dict



// see https://docs.influxdata.com/influxdb/v1.8/tools/api/#influxdb-1-x-http-endpoints


const query = `SELECT count("content") AS "count" from logs where theme = 'TREAT' group by time(1h)`



function parseInfluxApiResults(json) {
    const rows = []
    for (let { statement_id, series } of json.results) {
        for (let { name, columns, values } of series || []) {
            for (let row of values) {
                const entry = {
                    serie: name
                }
                for (let i = 0; i < columns.length; i++) {
                    const value = row[i]
                    const key = columns[i]
                    entry[key] = value
                }
                rows.push(entry)
            }
        }
    }
    return rows
}

import { checkResponse, getMetadata, localFetch } from "../../http-utils"
const getUrl = (spec) => spec.json || spec.url || spec.file

const schema = {
    type: 'object',
    properties: {
        endpoint: {
            type: 'string'
        },
        host: {
            type: 'string'
        },
        port: {
            type: 'number'
        },
        database: {
            type: 'string'
        },
        measurement: {
            type: 'string'
        },
        start: {
            type: 'number'
        },
        end: {
            type: 'number'
        },
        limitPerQuery: {
            type: 'number'
        },
        limit: {
            type: 'number'
        },
        offset: {
            type: 'number'
        },
        where: {
            type: 'array',
            items: {
                type: 'string'
            }
        }
    },
    required: ['host', 'port', 'database', 'measurement']
}

function influxTime(s) {
    if (s.includes(':') || s.includes("-"))
        return `'${s}'`
    else if (Number(s).toString() === s) {
        const num = Number(s)
        if (num < new Date("3000-01-01T00:00:00Z").getTime()) // ms
            return `${num}ms`
        else {
            return `${num}ns`
        }
    } else {
        return s
    }

}

function buildQuery(parameters, what = "*", groups = []) {
    const { measurement, start, end, where, limit, offset } = parameters
    let whereClauses = [...(where || [])]
    if (start)
        whereClauses.push(`time > ${influxTime(start)}`)
    if (end)
        whereClauses.push(`time < ${influxTime(end)}`)
    let query = `SELECT ${what} from ${measurement}`
    if (whereClauses.length) {
        query += ` WHERE ${whereClauses.map(w => Array.isArray(w) ? w.join(" OR ") : w).map(w => `(${w})`).join(' AND ')} `
    }
    if (limit)
        query += ` LIMIT ${limit}`
    if (offset)
        query += ` OFFSET ${offset}`

    return query
}

async function* streamAsyncIterator(stream) {
    const reader = stream.getReader();
    const decoder = new TextDecoder();
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        yield decoder.decode(value, { stream: !done });
    }
    reader.releaseLock();
}
function createClient(config) {
    // console.log('creating influx1 client', config)
    let {
        endpoint,
        host,
        port,
        database,
    } = config.influx1

    if (!endpoint) {
        endpoint = `http://${host || 'localhost'}:${port || 8086}`
    }
    return {
        async query(query) {
            const bodyParts = []
            bodyParts.push(["q", query])
            console.log("influx1 query", query)
            const url = new URL(`${endpoint}/query`)
            url.searchParams.append('db', database || 'dronisos')
            url.searchParams.append('q', query)
            url.searchParams.append('precision', 'n')
            const response = await fetch(url, {
                // method: 'POST', 
                // headers: {
                //     'Content-Type': 'application/x-www-form-urlencoded'
                // }, 
                // body: bodyParts.map(([key, value]) => ([encodeURIComponent(key), encodeURIComponent(value)].join("="))).join('&')
            })

            // let str = ""
            // for await (const value of streamAsyncIterator(response.body)) {
            //     console.log('partial body : ', value)
            //     str += value;
            // }
            // return JSON.parse(str);

            // const txt = await response.text()
            // console.log(txt)

            const json = await response.json()
            const rows = parseInfluxApiResults(json)
            return rows
        }
    }
}

import { DataChunk } from "../chunk"
import { BaseTableType } from "."

const VERBOSE = !!process.env.VERBOSE
export default function (spec, dataset) {
    let client = createClient(spec)
    const influxSpec = spec.influx1
    const timePerOffset = {}
    return {
        schema,
        async getChunkAt(chunkOffset, size) {
            let limit = influxSpec.limit || Number.MAX_SAFE_INTEGER
            let offset = chunkOffset || 0
            if (influxSpec.offset)
                offset += influxSpec.offset
            if (size)
                limit = Math.min(size, limit)
            if (influxSpec.limitPerQuery)
                limit = Math.min(limit, influxSpec.limitPerQuery)
            const queryOptions = { ...influxSpec, limit }

            const TIME_BASED_CURSOR = false
            if (TIME_BASED_CURSOR) {
                if (chunkOffset && timePerOffset[chunkOffset]) {
                    const start = timePerOffset[chunkOffset]
                    // console.log('found time offset', chunkOffset, start)
                    queryOptions.start = start
                } else if (chunkOffset) {
                    throw `could not find time for offset ${chunkOffset}`
                }
            } else {
                queryOptions.offset = offset
            }
            const query = buildQuery(queryOptions)
            if (VERBOSE) {
                console.log(queryOptions)
                console.log(query)
            }
            const result: any[] = await client.query(query)
            const resultSize = result.length
            const last = result[result.length - 1]
            if (last) {
                const nanotime = Number(last.time) //.getNanoTime()
                timePerOffset[chunkOffset + resultSize] = nanotime
                // console.log('new time offset', chunkOffset + resultSize, last.time.getNanoTime())
                // console.log(last.time.getNanoTime())
                // console.log(typeof last.time.getNanoTime())
            }
            // console.log("got", result.length, "samples")
            return { samples: () => result, offset: chunkOffset, size: resultSize, nrows: resultSize } as DataChunk
        },

        async getSize() {
            const query = buildQuery(influxSpec, "count(*)")
            if (VERBOSE) {
                console.log(influxSpec)
                console.log(query)
            }

            const result = await client.query(query)
            // console.log(JSON.stringify(result, null, 2))
            const row: any = result[0]
            let size = 0
            for (let prop in row)
                if (prop.startsWith('count_'))
                    size = Math.max(size, row[prop])
            if (influxSpec.limit)
                size = Math.min(size, influxSpec.limit)
            // console.log("size", size)
            return size
        }
    } as BaseTableType
}


