First commit
This commit is contained in:
131
app.py
Normal file
131
app.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import os
|
||||
import yaml
|
||||
from datetime import datetime, timedelta
|
||||
from zoneinfo import ZoneInfo
|
||||
from flask import Flask, jsonify, request, render_template, Response
|
||||
from pymongo import MongoClient
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
app = Flask(__name__)
|
||||
RIGA = ZoneInfo("Europe/Riga")
|
||||
|
||||
# MongoDB
|
||||
client = MongoClient(os.environ["MONGODB_URI"])
|
||||
db = client[os.environ.get("MONGODB_DB", "dailyform")]
|
||||
answers_col = db["current_answers"]
|
||||
snapshots_col = db["snapshots"]
|
||||
|
||||
def load_config():
|
||||
with open("questions.yaml", "r") as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
def get_questions():
|
||||
return load_config().get("questions", [])
|
||||
|
||||
def get_current_period():
|
||||
now = datetime.now(RIGA)
|
||||
today_10 = now.replace(hour=10, minute=0, second=0, microsecond=0)
|
||||
if now < today_10:
|
||||
start = today_10 - timedelta(days=1)
|
||||
end = today_10
|
||||
else:
|
||||
start = today_10
|
||||
end = today_10 + timedelta(days=1)
|
||||
return {"start": start.isoformat(), "end": end.isoformat()}
|
||||
|
||||
def take_snapshot():
|
||||
now = datetime.now(RIGA)
|
||||
doc = answers_col.find_one({}, {"_id": 0}) or {}
|
||||
date_str = now.strftime("%Y-%m-%d")
|
||||
period = get_current_period()
|
||||
for_date = datetime.fromisoformat(period["start"]).strftime("%Y-%m-%d")
|
||||
snapshots_col.insert_one({
|
||||
"timestamp": now.isoformat(),
|
||||
"date": date_str,
|
||||
"for_date": for_date,
|
||||
"answers": doc,
|
||||
})
|
||||
answers_col.replace_one({}, {}, upsert=True)
|
||||
app.logger.info(f"Snapshot taken: {date_str}, for_date: {for_date}")
|
||||
|
||||
scheduler = BackgroundScheduler(timezone=RIGA)
|
||||
scheduler.add_job(take_snapshot, "cron", hour=10, minute=0)
|
||||
scheduler.start()
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
return render_template("index.html", questions=get_questions(), period=get_current_period())
|
||||
|
||||
@app.route("/history")
|
||||
def history():
|
||||
return render_template("history.html", questions=get_questions())
|
||||
|
||||
@app.route("/api/answers", methods=["GET"])
|
||||
def get_answers():
|
||||
doc = answers_col.find_one({}, {"_id": 0}) or {}
|
||||
return jsonify(doc)
|
||||
|
||||
@app.route("/api/answers", methods=["POST"])
|
||||
def save_answers():
|
||||
data = request.get_json()
|
||||
if not isinstance(data, dict):
|
||||
return jsonify({"error": "invalid"}), 400
|
||||
answers_col.replace_one({}, data, upsert=True)
|
||||
return jsonify({"ok": True})
|
||||
|
||||
@app.route("/api/snapshots")
|
||||
def get_snapshots():
|
||||
limit = int(request.args.get("limit", 365))
|
||||
docs = list(snapshots_col.find({}, {"_id": 0}).sort("timestamp", -1).limit(limit))
|
||||
return jsonify(docs)
|
||||
|
||||
@app.route("/api/snapshots/<date_str>/md")
|
||||
def snapshot_as_md(date_str):
|
||||
snap = snapshots_col.find_one({"date": date_str}, {"_id": 0})
|
||||
if not snap:
|
||||
return "Not found", 404
|
||||
questions = get_questions()
|
||||
answers = snap.get("answers", {})
|
||||
|
||||
lines = [f"# Daily Check-in — {date_str}", ""]
|
||||
lines.append(f"_Snapshot: {snap.get('timestamp', '?')}_")
|
||||
if answers.get("__done__"):
|
||||
done_at = answers.get("__done_at__", "")
|
||||
try:
|
||||
done_at = datetime.fromisoformat(done_at).strftime("%H:%M")
|
||||
except Exception:
|
||||
pass
|
||||
lines.append(f"_Marked as filled at: {done_at}_")
|
||||
lines += ["", "---", ""]
|
||||
|
||||
for q in questions:
|
||||
val = answers.get(q["id"])
|
||||
lines.append(f"## {q['label']}")
|
||||
lines.append("")
|
||||
if val is None:
|
||||
lines.append("_(no answer)_")
|
||||
elif q["type"] == "yesno":
|
||||
lines.append("Yes" if val else "No")
|
||||
elif q["type"] in ("duration", "time"):
|
||||
h, m = divmod(int(val), 60)
|
||||
lines.append(f"{h:02d}:{m:02d}")
|
||||
elif q["type"] == "multichoice":
|
||||
lines.append(", ".join(val) if isinstance(val, list) else str(val))
|
||||
else:
|
||||
lines.append(str(val))
|
||||
lines.append("")
|
||||
|
||||
md = "\n".join(lines)
|
||||
return Response(md, mimetype="text/markdown",
|
||||
headers={"Content-Disposition": f"attachment; filename=checkin-{date_str}.md"})
|
||||
|
||||
@app.route("/api/snapshot/now", methods=["POST"])
|
||||
def snapshot_now():
|
||||
take_snapshot()
|
||||
return jsonify({"ok": True})
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=5000, debug=False)
|
||||
Reference in New Issue
Block a user