const {Subject, concat, of, from}  = require('rxjs')
const {take, filter, map, flatMap, catchError} = require('rxjs/operators')
const uuid_v4 = require("uuid").v4

let iceServers
const getIceServers = async (firebase) => {
  if (iceServers) {
    return iceServers
  }
  const func = firebase.functions().httpsCallable('getICEServers')
  const response = await func.call({})
  iceServers = response.data.iceServers
  console.log("ICE SERVERS", iceServers)
  iceServers = iceServers.filter(url => !url.startsWith('turn'))
  return iceServers
}

class Ntp {
  Ntp() {
    this.update()
  }
  update = () => {
    client.syncTime().then(packet => {
      console.log("TIME", packet)
      this.ntp = Date.parse(packet.time)
      this.start = Date.now()
    })
  }
  now = () => this.ntp + Date.now() - this.start
}

const ntp = new Ntp()

const collectionChanges = ref => {
  const subject = new Subject()
  ref.onSnapshot((snapshot) => {
    subject.next(snapshot.docChanges())
  })
  return subject
}

const doc = ref => {
  const subject = new Subject()
  ref.onSnapshot(snap => {
    subject.next(snap)
  })
  return subject
}

const { nodeDataChannel } = require('./DataChannel.js')


const sendDCMessage = (dc, message) => {
  //console.log('sending message', message)
  if (typeof(message) === 'string') {
    dc.sendMessage(message)
  } else {
    dc.sendMessageBinary(message)
  }
}

class BroadcastServer {

  constructor (serverId, opts) {
    this.opts = opts || {}
    this.serverId = serverId
    this.opts = opts
  }

  to = {} 

  addPeer = peer => {
    const dc = peer.getDataChannel(this.serverId, this.opts)
    this.to[dc.getId()] = dc
  }

  broadcast = message => {
    for (const id in this.to) {
      const to = this.to[id]
      if (this.opts.binary) {
        to.sendBinary(message)
      } else {
        to.send(message)
      }
    }
  }
}

class JsonServer {

  constructor (client, id, opts) {
    this.init(client, id, opts || {})
  }

  init = async (client, id, opts) => {
    this.dc = await client.openDataChannel(id, opts)
    this.sub = this.dc.observeMessages().subscribe(async message => {
      const json = JSON.parse(message)
      const { reqId, type } = json
      const response = {
        reqId
      }
      try {
        const result = await this.onCall(type, json[type])
        response.reply = result
      } catch (exception) {
        response.exception = exception
      }
      return this.dc.send(JSON.stringify(response))
    })
  }

  async onCall (type, value) {
  }

  close = () => {
    this.sub.unsubscribe()
    this.dc.close()
  }
}

class JsonClient {

  constructor (server, id, opts) {
    this.server = server
    this.id = id
    this.opts = opts || {}
  }

  init = async (server, id, opts) => {
    console.log("JsonClient.init", server, id)
    this.dc = await server.openDataChannel(id, opts)
    this.sub = this.dc.observeMessages().subscribe(message => {
      const json = JSON.parse(message)
      console.log('json', json)
      const { reqId, reply, exception } = json
      const k = this.k[reqId]
      const { resolve, reject } = k
      delete this.k[reqId]
      if (exception) {
        return reject(exception)
      }
      resolve(reply)
    })
  }

  reqId = 0

  k = {}

  call = async (type, value) => {
    if (!this.dc) {
      await this.init(this.server, this.id, this.opts)
    }
    return new Promise((resolve, reject) => {
      const reqId = ++this.reqId
      const msg = {
        reqId,
        type,
      }
      msg[type] = value
      const json = JSON.stringify(msg)
      this.k[reqId] = {
        resolve, reject
      }
      this.dc.send(json)
    })
  }

  close = () => {
    this.sub.unsubscribe()
    this.dc.close()
  }
  
}

class P2P {

  constructor (firebase,
               options) {
    const { user, outgoingOnly, audio } = options
    this.user = user
    this.firebase = firebase
    this.initUser(this.user, outgoingOnly)
  }

  peers = {}
  peerSubject = new Subject()
  
  getPeers = () => Object.values(this.peers)
  
  addPeer = peer => {
    console.log('adding peer', peer.endpoint)
    this.peers[peer.endpoint] = peer
    peer.observeHangup().subscribe(() => this.removePeer(peer))
    this.peerSubject.next({
      type: 'added',
      peer,
    })
  }
  
  removePeer = peer => {
    console.log('removing peer', peer.endpoint)
    delete this.peers[peer.endpoint]
    this.peerSubject.next({
      type: 'removed',
      peer,
    })
  }
  
  observePeers = () => {
    const connected = Object.values(this.peers).map(peer => {
      return {
	type: 'added',
	peer
      }
    })
    return concat(from(connected), this.peerSubject)
  }
  
  initUser = (user, outgoingOnly) => {
    console.log("INIT USER", user && user.uid, 'outgoingOnly', outgoingOnly)
    if (outgoingOnly) return
    if (!user) {
      if (this._answerer) {
	this._answerer.unsubscribe()
	this._answerer = null
      }
      if (this._called) {
	this._called.unsubscribe()
	this._called = null
      }
      return 
    }
    
    const observeCalls = () => {
      const ref = this.firebase.firestore().collection('Calls').where('to', '==', user.uid).where('toEndpoint', '==', '').where('state', '==', 'call')
      return collectionChanges(ref).pipe(flatMap(changes => {
	return changes.map(change => {
	  const { type, doc } = change
	  return {
	    type,
	    call: doc
	  }
	})
      }), catchError(err => {
	console.error("ERROR", err)
      }))
    }
    
    this._answerer = observeCalls().subscribe(async change => {
      console.log("observeCall", change.call.id, change.type, JSON.stringify(change.call.data()))
      if (change.type !== 'added') {
	console.log('ignoring modified call')
	return
      }
      const call = change.call
      const callData = change.call.data()
      if (callData.toEndpoint != '') {
	console.log('ignoring answered call')
	return
      }
      console.log('answering call')
      const answer = async () => {
	const db = this.firebase.firestore()
	const endpoint = uuid_v4()
	const peerIndex = this.getNextPeerIndex()
	try {
	  console.log("answer call", call.id, JSON.stringify(callData))
	  const ref = db.collection('Calls').doc(call.id)
	  let result
	  while (result === undefined) {
	    result = await db.runTransaction(async t => {
	      const snap = await t.get(ref)
	      if (!snap.exists) {
		//console.log('doesn\'t exist')
		return
	      }
	      const data = snap.data()
	      //console.log('transact data', data)
	      if (data.toEndpoint == '') {
		//////console.log("answering call in transaction", data)
		await t.set(ref, {
		  toEndpoint: endpoint,
		  toPeerIndex: peerIndex,
		  state: "answered"
		}, { merge: true })
		return true
	      }
	      console.log('not answering call', call.id, data)
	      return false
	    })
	    console.log("answered call", result)
	  }
	  if (!result) {
	    return
	  }
	  const newPeer = await new Promise((resolve, reject) => {
	    getIceServers(this.firebase).then(iceServers => {
	      const options = {
		firebase: this.firebase,
		peerIndex,
		displayName: callData.fromDisplayName,
		from: this.user,
		endpoint,
		toPeerIndex: callData.fromPeerIndex,
		to: callData.from,
		toEndpoint: callData.fromEndpoint,
		callId: call.id,
		isOffer: false,
		iceServers,
		resolve,
		reject,
		maxPacketLifetime: callData.maxPacketLifetime,
		maxRetransmits: callData.maxRetransmits,
		unordered: callData.unordered,
                audioTrackInit: this.audioTrackInit
	      }
	      const p = new Peer(options)
	    })
	  })
	  this.addPeer(newPeer);
	  this.callSubject.next(newPeer)
	} catch (err) {
	  console.log('ERROR answer running trans', err)
	  process.exit(1)
	}
      }
      console.log('IncomingCall Observers', this.incomingCallSubject.observers.length);
      if (this.incomingCallSubject.observers.length > 0) {
	this.incomingCallSubject.next({
	  uid: callData.from,
	  displayName: callData.fromDisplayName,
	  answer
	})
      } else {
	answer()
      }
    })
  }

  incomingCallSubject = new Subject()
  observeIncomingCalls = () => incomingCallSubject

  callSubject = new Subject()

  observeCalls = () => this.callSubject

  peerIndex = 0
  getNextPeerIndex = () => {
    this.peerIndex = this.peerIndex + 1
    return this.peerIndex
  }

  call = async (to, displayName) => {
    const endpoint = uuid_v4()
    const peerIndex = this.getNextPeerIndex()
    return getIceServers(this.firebase).then(iceServers => {
      let unordered = false
      let maxPacketLifetime = 0
      let maxRetransmits = null
      return new Promise(async (resolve, reject) => {
	const call = {
	  from: this.user.uid,
	  fromEndpoint: endpoint,
	  fromPeerIndex: peerIndex,
	  fromDisplayName: displayName || '',
	  to,
	  toEndpoint: '',
	  state: 'call',
	  unordered,
	  maxPacketLifetime,
	  maxRetransmits,
	  when: Date.now()
	}
	const ref = await this.firebase.firestore().collection('Calls').add(call)
	console.log('making call', call)
	let peer
	resolve.sub = doc(ref).subscribe(doc => {
	  const data = doc.data()
	  console.log('CALL data', JSON.stringify(data))
	  if (data && data.state === 'answered') {
	    resolve.sub.unsubscribe()
	    if (!peer) {
	      //console.log("ANSWERED", JSON.stringify(data))
	      const options = {
		firebase: this.firebase,
		peerIndex,
		displayName: data.fromDisplayName,
		from: this.user,
		endpoint,
		toPeerIndex: data.fromPeerIndex,
		to: data.to,
		toEndpoint: data.toEndpoint,
		callId: ref.id,
		isOffer: true,
		iceServers,
		resolve: peer => {
		  this.addPeer(peer)
		  resolve(peer)
		},
		reject,
		unordered,
		maxPacketLifetime,
		maxRetransmits,
                audioTrackInit: this.audioTrackInit
	      }
	      peer = new Peer(options)
	    }
	  }
	})
      })
    })
  }
}


class DataChannel {

  constructor (pc, label, dc, opts) {
    this.opts = opts
    this.pc = pc
    this.label = label
    if (dc) {
      this.initDC(dc)
    }
  }

  getLabel = () => this.label

  initDC = (dc) => {
    this.dc = dc
    dc.onMessage(message => {
      //console.log('received message', message)
      this.messageSubject.next(message)
    })
    dc.onOpen(this.fireOpenDC)
    dc.onClosed(this.onCloseDC)
  }

  wasOpen = false

  fireOpenDC = () => {
    const isOpen = this.isOpen()
    console.log('FIRE OPEN DC', isOpen)
    if (isOpen != this.wasOpen) {
      this.wasOpen = isOpen
      this.onOpenSubject.next(isOpen)
    }
  }

  onCloseDC = async () => {
    this.fireOpenDC()
    this.pc.onCloseDC(this)
  }

  onOpenSubject = new Subject()

  messageSubject = new Subject()
  
  isOpen = () => {
    return this.dc && this.dc.isOpen()
  }

  observeIsOpen = () => {
    if (this.isOpen()) {
      return concat(of(true), this.onOpenSubject)
    }
    return this.onOpenSubject
  }

  observeMessages = () => {
    return this.messageSubject
  }

  send = (message) => {
    try {
      return sendDCMessage(this.dc, message)
    } catch (exc) {
      this.dc.close()
      console.error(exc)
    }
  }
}


class Peer {

  constructor (options) {
    let { firebase, unordered, maxRetransmits, maxPacketLifetime, peerIndex, displayName, from, endpoint, toPeerIndex, to, toEndpoint, callId, isOffer, iceServers, resolve, reject, audioTrackInit } = options;
    //////console.log("Peer iceServers:", iceServers)
    this.peerIndex = peerIndex
    this.firebase = firebase
    this.from = from
    this.uid = from.uid
    this.endpoint = endpoint
    this.callId = callId
    this.isOffer = isOffer
    this.name = this.uid+'-'+endpoint
    this.displayName = displayName
    this.to = to
    this.toEndpoint = toEndpoint
    this.toPeerIndex = toPeerIndex
    console.log("Peer", options)
    this.audioTrackInit = audioTrackInit
    this.pc = new nodeDataChannel.PeerConnection(this.name, { iceServers, disableAutoNegotation: true })
    this.localDescription = ''
    this.localCandidates = []
    const pc = this.pc
    if (this.isOffer) {
      this.pc.setLocalDescription('Offer')
    } 

    pc.onDataChannel(dc => {
      const label = dc.getLabel()
      if (label === 'control') {
        dc.close()
        return 
      }
      if (this.dcs[label]) {
        this.dcs[label].initDC(dc)
      } else {
        this.dcs[label] = new DataChannel(this, label, dc)
      }
      this.channelSubject.next(this.dcs[label])
    })

    pc.onStateChange((state) => {
      if (this.state != state) {
	console.log('state: ', this.peerIndex, this.callId, this.endpoint, state);
	this.state = state
        this.stateSubject.next(state)
      }
    })
    
    pc.onGatheringStateChange((state) => {
      if (this.gatheringState !== state) {
	//////console.log('GatheringState: ', state);
	this.gatheringState = state
      }
    })
    pc.onLocalDescription(async (sdp, type) => {
      if (true || !this.localDescription) {
	console.log("got local description", sdp, type)
	type = this.isOffer ? 'offer' : 'answer'
	const updates = {}
	this.localDescription = {
	  sdp, type
	}
	updates[type] = this.localDescription
	console.log('sent ', type, this.peerIndex, this.callId, this.endpoint)
	await this.updateDb(updates)
      }
    })
    pc.onLocalCandidate((candidate, mid) => {
      console.log("got local candidate", candidate, mid)
      if (this.gatheringState !== 'completed') {
	this.localCandidates.push({ candidate, mid })
	this.localCandidates.sort((x, y) => x.candidate.localeCompare(y.candidate))
	let updates
	if (this.isOffer) {
	  updates = {
	    fromIceCandidates: this.localCandidates
	  }
	} else {
	  updates = {
	    toIceCandidates: this.localCandidates
	  }
	}
	this.updateDb(updates)
      }
    })
    this.listenForSignaling()
    if (isOffer) {
      this.control = this.pc.createDataChannel('control',  { ordered: true })
    }
    const sub = this.observeIsOpen().subscribe(isOpen => {
      console.log("IS OPEN", isOpen)
      if (isOpen) {
        this.deleteCallRec()
	sub.unsubscribe()
	resolve(this)
      }
    })
  }

  close = () => this.pc.close()

  observeState = () => {
    if (this.state) {
      return concat(of(this.state), this.stateSubject)
    }
    return this.stateSubject
  }

  stateSubject = new Subject()

  channelSubject = new Subject()
  
  observeDataChannel = () => this.channelSubject
  
  observeIsOpen = () => this.observeState().pipe(map(state => state === 'connected'))

  createDC = (label, opts) => {
    opts = opts || {
      reliable: true, unordered: false
    }
    const { reliable, unordered } = opts
    const ordered = !unordered;
    let dc
    if (reliable) {
      let init = {
        ordered,
      }
      dc = this.pc.createDataChannel(label, init)
    } else {
      let { maxRetransmits, maxPacketLifetime} = opts
      if (!maxRetransmits && !maxPacketLifetime) {
        maxRetransmits = 0
        maxPacketLifetime = undefined;
      }
      init = {
        maxRetransmits,
        maxPacketLifetime,
      }
      dc = this.pc.createDataChannel(label, init)
    }
    return dc
  }

  dcs = {}

  getDataChannel = label => {
    if (!this.dcs[label]) {
      const ch = new DataChannel(this, label)
      this.dcs[label] = ch
      this.channelSubject.next(ch)
    }
    return this.dcs[label]
  }

  openDataChannel = async (label, opts) => {
    const dc = this.createDataChannel(label, opts)
    if (dc.isOpen()) {
      return dc
    }
    return new Promise(resolve => {
      dc.observeIsOpen().subscribe(isOpen => {
        if (isOpen) {
          resolve(dc)
        }
      })
    })
  }

  createDataChannel = (label, opts) => {
    let ch
    if (!(ch = this.dcs[label])) {
      ch = this.dcs[label] = new DataChannel(this, label)
    }
    if (this.isOffer) {
      ch.initDC(this.createDC(label, opts))
    }
    this.channelSubject.next(ch)
    return ch
  }

  onCloseDC = channel => {
    delete this.dcs[channel.label]
  }

  ref = () => this.firebase.firestore().collection('Calls').doc(this.callId)

  updateDb = async (updates) => {
    for (const x in updates) {
      //////console.log('updating database', this.peerIndex, this.endpoint, this.state, updates)
      await this.ref().set(updates, { merge: true })
      break
    }
  }

  deleteCallRec = async () => {
    await this.ref().delete()
  }

  remoteCandidates = {}

  hangup = () => {
    const pc = this.pc
    const listener = this.listener
    if (listener) listener.unsubscribe()
    if (pc) {
      pc.close()
      this.updateDb({ state: 'hangup' })
    }
    this.listener = null
    this.hangupSubject.next()
  }

  hangupSubject = new Subject()
  observeHangup = () => this.hangupSubject

  listenForSignaling = () => {
    //////console.log(this.from, this.endpoint, 'connecting to', this.callId)
    const candidates = {}
    let queue = []
    this.listener = doc(this.ref()).subscribe(snap => {
      if (!this.pc) {
	return;
      }
      const data = snap.data()
      if (data && data.state == 'hangup') {
	this.listener.unsubscribe()
	this.hangup()
	return
      }
      if (data) {
	//console.log("signaling data", JSON.stringify(data))
	const { state, fromIceCandidates, toIceCandidates, offer, answer } = data
	if (this.isOffer) {
	  if (answer && !this.gotAnswer) {
	    const { sdp, type } = answer
	    try {
	      this.pc.setRemoteDescription(sdp, type)
	      this.gotAnswer = true
	      //console.log("got answer", this.peerIndex, this.callId, this.endpoint)
	      queue.forEach(c => {
		const { candidate, mid } = c
                console.log("ADD REMOTE CANDIDATE (to)", candidate)                    
		this.pc.addRemoteCandidate(candidate, mid)
	      })
	      queue = []
	    } catch (err){
	      ////console.log(this.state, err)
	    }
	  }
	  if (toIceCandidates) {
	    if (false && !this.gotAnswer) {
	      queue = toIceCandidates
	    } else {
	      toIceCandidates.forEach(c => {
		if (!candidates[c.candidate]) {
		  candidates[c.candidate] = c
		  const { candidate, mid } = c
		  try {
                    console.log("ADD REMOTE CANDIDATE (to)", candidate)                    
		    this.pc.addRemoteCandidate(candidate, mid)
		  } catch (err) {
		    console.warn(err)
		  }
		}
	      })
	    }
	  }
	} else {
	  if (offer && !this.gotOffer) {
	    const { sdp, type } = offer
	    try {
	      this.pc.setRemoteDescription(sdp, type)
	      this.pc.setLocalDescription('Answer')
	      this.gotOffer = true
	      //console.log("got offer", this.peerIndex, this.callId, this.endpoint)
	      queue.forEach(c => {
		const { candidate, mid } = c
                console.log("ADD REMOTE CANDIDATE (to)", candidate)                    
		this.pc.addRemoteCandidate(candidate, mid)
	      })
	      queue = []
	    } catch (err) {
	      ////console.log(this.state, err)
	    }
	  }
	  if (fromIceCandidates) {
	    if (false && !this.gotOffer) {
	      queue = fromIceCandidates
	    } else {
	      fromIceCandidates.forEach(c => {
		if (!candidates[c.candidate]) {
		  candidates[c.candidate] = c
		  const { candidate, mid } = c
                  console.log("ADD REMOTE CANDIDATE (from)", candidate)                    
		  this.pc.addRemoteCandidate(candidate, mid)
		}
	      })
	    }
	  }
	}
      }
    })
  }
}

//module.exports = { P2P, JsonServer, JsonClient }
export { P2P, JsonServer, JsonClient }
