132 lines
4.1 KiB
Python
132 lines
4.1 KiB
Python
import os
|
|
import yaml
|
|
from datetime import datetime, timedelta
|
|
from zoneinfo import ZoneInfo
|
|
from flask import Flask, jsonify, request, render_template, Response
|
|
from pymongo import MongoClient
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
RIGA = ZoneInfo("Europe/Riga")
|
|
|
|
# MongoDB
|
|
client = MongoClient(os.environ["MONGODB_URI"])
|
|
db = client[os.environ.get("MONGODB_DB", "dailyform")]
|
|
answers_col = db["current_answers"]
|
|
snapshots_col = db["snapshots"]
|
|
|
|
def load_config():
|
|
with open("questions.yaml", "r") as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def get_questions():
|
|
return load_config().get("questions", [])
|
|
|
|
def get_current_period():
|
|
now = datetime.now(RIGA)
|
|
today_10 = now.replace(hour=10, minute=0, second=0, microsecond=0)
|
|
if now < today_10:
|
|
start = today_10 - timedelta(days=1)
|
|
end = today_10
|
|
else:
|
|
start = today_10
|
|
end = today_10 + timedelta(days=1)
|
|
return {"start": start.isoformat(), "end": end.isoformat()}
|
|
|
|
def take_snapshot():
|
|
now = datetime.now(RIGA)
|
|
doc = answers_col.find_one({}, {"_id": 0}) or {}
|
|
date_str = now.strftime("%Y-%m-%d")
|
|
period = get_current_period()
|
|
for_date = datetime.fromisoformat(period["start"]).strftime("%Y-%m-%d")
|
|
snapshots_col.insert_one({
|
|
"timestamp": now.isoformat(),
|
|
"date": date_str,
|
|
"for_date": for_date,
|
|
"answers": doc,
|
|
})
|
|
answers_col.replace_one({}, {}, upsert=True)
|
|
app.logger.info(f"Snapshot taken: {date_str}, for_date: {for_date}")
|
|
|
|
scheduler = BackgroundScheduler(timezone=RIGA)
|
|
scheduler.add_job(take_snapshot, "cron", hour=10, minute=0)
|
|
scheduler.start()
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return render_template("index.html", questions=get_questions(), period=get_current_period())
|
|
|
|
@app.route("/history")
|
|
def history():
|
|
return render_template("history.html", questions=get_questions())
|
|
|
|
@app.route("/api/answers", methods=["GET"])
|
|
def get_answers():
|
|
doc = answers_col.find_one({}, {"_id": 0}) or {}
|
|
return jsonify(doc)
|
|
|
|
@app.route("/api/answers", methods=["POST"])
|
|
def save_answers():
|
|
data = request.get_json()
|
|
if not isinstance(data, dict):
|
|
return jsonify({"error": "invalid"}), 400
|
|
answers_col.replace_one({}, data, upsert=True)
|
|
return jsonify({"ok": True})
|
|
|
|
@app.route("/api/snapshots")
|
|
def get_snapshots():
|
|
limit = int(request.args.get("limit", 365))
|
|
docs = list(snapshots_col.find({}, {"_id": 0}).sort("timestamp", -1).limit(limit))
|
|
return jsonify(docs)
|
|
|
|
@app.route("/api/snapshots/<date_str>/md")
|
|
def snapshot_as_md(date_str):
|
|
snap = snapshots_col.find_one({"date": date_str}, {"_id": 0})
|
|
if not snap:
|
|
return "Not found", 404
|
|
questions = get_questions()
|
|
answers = snap.get("answers", {})
|
|
|
|
lines = [f"# Daily Check-in — {date_str}", ""]
|
|
lines.append(f"_Snapshot: {snap.get('timestamp', '?')}_")
|
|
if answers.get("__done__"):
|
|
done_at = answers.get("__done_at__", "")
|
|
try:
|
|
done_at = datetime.fromisoformat(done_at).strftime("%H:%M")
|
|
except Exception:
|
|
pass
|
|
lines.append(f"_Marked as filled at: {done_at}_")
|
|
lines += ["", "---", ""]
|
|
|
|
for q in questions:
|
|
val = answers.get(q["id"])
|
|
lines.append(f"## {q['label']}")
|
|
lines.append("")
|
|
if val is None:
|
|
lines.append("_(no answer)_")
|
|
elif q["type"] == "yesno":
|
|
lines.append("Yes" if val else "No")
|
|
elif q["type"] in ("duration", "time"):
|
|
h, m = divmod(int(val), 60)
|
|
lines.append(f"{h:02d}:{m:02d}")
|
|
elif q["type"] == "multichoice":
|
|
lines.append(", ".join(val) if isinstance(val, list) else str(val))
|
|
else:
|
|
lines.append(str(val))
|
|
lines.append("")
|
|
|
|
md = "\n".join(lines)
|
|
return Response(md, mimetype="text/markdown",
|
|
headers={"Content-Disposition": f"attachment; filename=checkin-{date_str}.md"})
|
|
|
|
@app.route("/api/snapshot/now", methods=["POST"])
|
|
def snapshot_now():
|
|
take_snapshot()
|
|
return jsonify({"ok": True})
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000, debug=False)
|