diff --git a/demo_apps/task-limits/assess-utilization.ipynb b/demo_apps/task-limits/assess-utilization.ipynb index 12022f5..e2d00d3 100644 --- a/demo_apps/task-limits/assess-utilization.ipynb +++ b/demo_apps/task-limits/assess-utilization.ipynb @@ -20,6 +20,7 @@ "import pandas as pd\n", "import numpy as np\n", "import json\n", + "import re\n", "params = {'legend.fontsize': 8,\n", " 'axes.labelsize': 9,\n", " 'axes.titlesize':'x-large',\n", @@ -45,7 +46,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Found 16 runs\n" + "Found 14 runs\n" ] } ], @@ -74,6 +75,7 @@ " config = json.load(fp)\n", " \n", " config['path'] = path.parent\n", + " config['host'] = path.parent.name.rsplit(\"-\", 5)[0]\n", " return config" ] }, @@ -109,6 +111,7 @@ " \n", " # Get the results for each worker\n", " results = pd.read_json(path / \"results.json\", lines=True)\n", + " \n", " if len(results) == 0:\n", " return None\n", " results['worker'] = results['worker_info'].apply(lambda x: f'{x[\"hostname\"]}-{x[\"PARSL_WORKER_RANK\"]}')\n", @@ -146,9 +149,16 @@ "execution_count": 7, "metadata": {}, "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "No artists with labels found to put in legend. Note that artists whose label start with an underscore are ignored when legend() is called with no argument.\n" + ] + }, { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -160,7 +170,7 @@ "source": [ "fig, ax = plt.subplots(figsize=(3.5, 1.75))\n", "\n", - "for (gid, group), m in zip(results.groupby(['task_input_size', 'parallel_tasks']), ['o', 's', '^', 'v']):\n", + "for (gid, group), m in zip(results.query('host==\"x3002c0s37b0n0\"').groupby(['task_input_size', 'parallel_tasks']), ['o', 's', '^', 'v']):\n", " group.sort_values(['task_length', 'utilization'], ascending=False, inplace=True)\n", " group.drop_duplicates('task_length', inplace=True, keep='first')\n", " ax.semilogx(group['task_length'], group['utilization'] * 100, '--'+m, label=f'$s$={gid[0]}MB, $N$={gid[1]}')\n", @@ -174,181 +184,165 @@ "fig.savefig('performance-envelope.pdf')" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Measure the Response, Decision, and Dispatch Times\n", + "We can measure three sources of latency for applications using the logs in the `Result` object for response and dispatch, and the Colmena logs for the reaction time" + ] + }, { "cell_type": "code", "execution_count": 8, "metadata": {}, + "outputs": [], + "source": [ + "def get_median_reaction_time(path: Path):\n", + " \"\"\"Measure the median reaction time for all tasks, total and data-related, \n", + " broken down by compute and data transit\"\"\"\n", + "\n", + " # Loop over the tasks\n", + " compute = []\n", + " data = []\n", + " with path.joinpath('results.json').open() as fp:\n", + " for line in fp:\n", + " record = json.loads(line)\n", + " compute_time = (\n", + " record['timestamp']['result_received'] -\n", + " record['timestamp']['compute_ended']\n", + " ) # Time for the compute message to arrive\n", + " compute.append(compute_time)\n", + "\n", + " # Additional time to read the data\n", + " data_time = compute_time + record['task_info']['read_time']\n", + " data.append(data_time)\n", + "\n", + " return np.percentile(compute, 50), np.percentile(data, 50)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results['path'].apply(get_median_reaction_time)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, "outputs": [ { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
configlocal_hosttask_input_sizetask_output_sizetask_countworker_counttask_lengthtask_length_stduse_proxystoreproxystore_thresholdreuse_dataoutput_dirstore_configparsl_configparallel_taskspathutilization
0NoneTrue1.01.01281610.00.01True1FalserunsStore(name=store, connector=RedisConnector(hos...Config(\\n app_cache=True, \\n checkpoint_...16runs/bettik-linux-2024-05-16_13-39-140.997314
3NoneTrue100.0100.01281610.00.01True1FalserunsStore(name=store, connector=RedisConnector(hos...Config(\\n app_cache=True, \\n checkpoint_...16runs/bettik-linux-2024-05-16_13-48-090.452690
6NoneTrue0.10.11281610.00.01True1FalserunsStore(name=store, connector=RedisConnector(hos...Config(\\n app_cache=True, \\n checkpoint_...16runs/bettik-linux-2024-05-16_13-37-170.997465
10NoneTrue10.010.01281610.00.01True1FalserunsStore(name=store, connector=RedisConnector(hos...Config(\\n app_cache=True, \\n checkpoint_...16runs/bettik-linux-2024-05-16_13-41-260.976018
\n", - "
" - ], - "text/plain": [ - " config local_host task_input_size task_output_size task_count \\\n", - "0 None True 1.0 1.0 128 \n", - "3 None True 100.0 100.0 128 \n", - "6 None True 0.1 0.1 128 \n", - "10 None True 10.0 10.0 128 \n", - "\n", - " worker_count task_length task_length_std use_proxystore \\\n", - "0 16 10.0 0.01 True \n", - "3 16 10.0 0.01 True \n", - "6 16 10.0 0.01 True \n", - "10 16 10.0 0.01 True \n", - "\n", - " proxystore_threshold reuse_data output_dir \\\n", - "0 1 False runs \n", - "3 1 False runs \n", - "6 1 False runs \n", - "10 1 False runs \n", - "\n", - " store_config \\\n", - "0 Store(name=store, connector=RedisConnector(hos... \n", - "3 Store(name=store, connector=RedisConnector(hos... \n", - "6 Store(name=store, connector=RedisConnector(hos... \n", - "10 Store(name=store, connector=RedisConnector(hos... \n", - "\n", - " parsl_config parallel_tasks \\\n", - "0 Config(\\n app_cache=True, \\n checkpoint_... 16 \n", - "3 Config(\\n app_cache=True, \\n checkpoint_... 16 \n", - "6 Config(\\n app_cache=True, \\n checkpoint_... 16 \n", - "10 Config(\\n app_cache=True, \\n checkpoint_... 16 \n", - "\n", - " path utilization \n", - "0 runs/bettik-linux-2024-05-16_13-39-14 0.997314 \n", - "3 runs/bettik-linux-2024-05-16_13-48-09 0.452690 \n", - "6 runs/bettik-linux-2024-05-16_13-37-17 0.997465 \n", - "10 runs/bettik-linux-2024-05-16_13-41-26 0.976018 " - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" + "ename": "IndexError", + "evalue": "cannot do a non-empty take from an empty axes.", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mIndexError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[9], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m results[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mrxn_time_compute\u001b[39m\u001b[38;5;124m'\u001b[39m], results[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mrxn_time_data\u001b[39m\u001b[38;5;124m'\u001b[39m] \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mzip\u001b[39m(\u001b[38;5;241m*\u001b[39mresults[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mpath\u001b[39m\u001b[38;5;124m'\u001b[39m]\u001b[38;5;241m.\u001b[39mapply(get_median_reaction_time))\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/pandas/core/series.py:4771\u001b[0m, in \u001b[0;36mSeries.apply\u001b[0;34m(self, func, convert_dtype, args, **kwargs)\u001b[0m\n\u001b[1;32m 4661\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mapply\u001b[39m(\n\u001b[1;32m 4662\u001b[0m \u001b[38;5;28mself\u001b[39m,\n\u001b[1;32m 4663\u001b[0m func: AggFuncType,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 4666\u001b[0m \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs,\n\u001b[1;32m 4667\u001b[0m ) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m DataFrame \u001b[38;5;241m|\u001b[39m Series:\n\u001b[1;32m 4668\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 4669\u001b[0m \u001b[38;5;124;03m Invoke function on values of Series.\u001b[39;00m\n\u001b[1;32m 4670\u001b[0m \n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 4769\u001b[0m \u001b[38;5;124;03m dtype: float64\u001b[39;00m\n\u001b[1;32m 4770\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m-> 4771\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m SeriesApply(\u001b[38;5;28mself\u001b[39m, func, convert_dtype, args, kwargs)\u001b[38;5;241m.\u001b[39mapply()\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/pandas/core/apply.py:1123\u001b[0m, in \u001b[0;36mSeriesApply.apply\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1120\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mapply_str()\n\u001b[1;32m 1122\u001b[0m \u001b[38;5;66;03m# self.f is Callable\u001b[39;00m\n\u001b[0;32m-> 1123\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mapply_standard()\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/pandas/core/apply.py:1174\u001b[0m, in \u001b[0;36mSeriesApply.apply_standard\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1172\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 1173\u001b[0m values \u001b[38;5;241m=\u001b[39m obj\u001b[38;5;241m.\u001b[39mastype(\u001b[38;5;28mobject\u001b[39m)\u001b[38;5;241m.\u001b[39m_values\n\u001b[0;32m-> 1174\u001b[0m mapped \u001b[38;5;241m=\u001b[39m lib\u001b[38;5;241m.\u001b[39mmap_infer(\n\u001b[1;32m 1175\u001b[0m values,\n\u001b[1;32m 1176\u001b[0m f,\n\u001b[1;32m 1177\u001b[0m convert\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mconvert_dtype,\n\u001b[1;32m 1178\u001b[0m )\n\u001b[1;32m 1180\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(mapped) \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(mapped[\u001b[38;5;241m0\u001b[39m], ABCSeries):\n\u001b[1;32m 1181\u001b[0m \u001b[38;5;66;03m# GH#43986 Need to do list(mapped) in order to get treated as nested\u001b[39;00m\n\u001b[1;32m 1182\u001b[0m \u001b[38;5;66;03m# See also GH#25959 regarding EA support\u001b[39;00m\n\u001b[1;32m 1183\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m obj\u001b[38;5;241m.\u001b[39m_constructor_expanddim(\u001b[38;5;28mlist\u001b[39m(mapped), index\u001b[38;5;241m=\u001b[39mobj\u001b[38;5;241m.\u001b[39mindex)\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/pandas/_libs/lib.pyx:2924\u001b[0m, in \u001b[0;36mpandas._libs.lib.map_infer\u001b[0;34m()\u001b[0m\n", + "Cell \u001b[0;32mIn[8], line 21\u001b[0m, in \u001b[0;36mget_median_reaction_time\u001b[0;34m(path)\u001b[0m\n\u001b[1;32m 18\u001b[0m data_time \u001b[38;5;241m=\u001b[39m compute_time \u001b[38;5;241m+\u001b[39m record[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mtask_info\u001b[39m\u001b[38;5;124m'\u001b[39m][\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mread_time\u001b[39m\u001b[38;5;124m'\u001b[39m]\n\u001b[1;32m 19\u001b[0m data\u001b[38;5;241m.\u001b[39mappend(data_time)\n\u001b[0;32m---> 21\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m np\u001b[38;5;241m.\u001b[39mpercentile(compute, \u001b[38;5;241m50\u001b[39m), np\u001b[38;5;241m.\u001b[39mpercentile(data, \u001b[38;5;241m50\u001b[39m)\n", + "File \u001b[0;32m<__array_function__ internals>:200\u001b[0m, in \u001b[0;36mpercentile\u001b[0;34m(*args, **kwargs)\u001b[0m\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/numpy/lib/function_base.py:4205\u001b[0m, in \u001b[0;36mpercentile\u001b[0;34m(a, q, axis, out, overwrite_input, method, keepdims, interpolation)\u001b[0m\n\u001b[1;32m 4203\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m _quantile_is_valid(q):\n\u001b[1;32m 4204\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mPercentiles must be in the range [0, 100]\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m-> 4205\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m _quantile_unchecked(\n\u001b[1;32m 4206\u001b[0m a, q, axis, out, overwrite_input, method, keepdims)\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/numpy/lib/function_base.py:4473\u001b[0m, in \u001b[0;36m_quantile_unchecked\u001b[0;34m(a, q, axis, out, overwrite_input, method, keepdims)\u001b[0m\n\u001b[1;32m 4465\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_quantile_unchecked\u001b[39m(a,\n\u001b[1;32m 4466\u001b[0m q,\n\u001b[1;32m 4467\u001b[0m axis\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mNone\u001b[39;00m,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 4470\u001b[0m method\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mlinear\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n\u001b[1;32m 4471\u001b[0m keepdims\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mFalse\u001b[39;00m):\n\u001b[1;32m 4472\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Assumes that q is in [0, 1], and is an ndarray\"\"\"\u001b[39;00m\n\u001b[0;32m-> 4473\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m _ureduce(a,\n\u001b[1;32m 4474\u001b[0m func\u001b[38;5;241m=\u001b[39m_quantile_ureduce_func,\n\u001b[1;32m 4475\u001b[0m q\u001b[38;5;241m=\u001b[39mq,\n\u001b[1;32m 4476\u001b[0m keepdims\u001b[38;5;241m=\u001b[39mkeepdims,\n\u001b[1;32m 4477\u001b[0m axis\u001b[38;5;241m=\u001b[39maxis,\n\u001b[1;32m 4478\u001b[0m out\u001b[38;5;241m=\u001b[39mout,\n\u001b[1;32m 4479\u001b[0m overwrite_input\u001b[38;5;241m=\u001b[39moverwrite_input,\n\u001b[1;32m 4480\u001b[0m method\u001b[38;5;241m=\u001b[39mmethod)\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/numpy/lib/function_base.py:3752\u001b[0m, in \u001b[0;36m_ureduce\u001b[0;34m(a, func, keepdims, **kwargs)\u001b[0m\n\u001b[1;32m 3749\u001b[0m index_out \u001b[38;5;241m=\u001b[39m (\u001b[38;5;241m0\u001b[39m, ) \u001b[38;5;241m*\u001b[39m nd\n\u001b[1;32m 3750\u001b[0m kwargs[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mout\u001b[39m\u001b[38;5;124m'\u001b[39m] \u001b[38;5;241m=\u001b[39m out[(\u001b[38;5;28mEllipsis\u001b[39m, ) \u001b[38;5;241m+\u001b[39m index_out]\n\u001b[0;32m-> 3752\u001b[0m r \u001b[38;5;241m=\u001b[39m func(a, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs)\n\u001b[1;32m 3754\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m out \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 3755\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m out\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/numpy/lib/function_base.py:4639\u001b[0m, in \u001b[0;36m_quantile_ureduce_func\u001b[0;34m(a, q, axis, out, overwrite_input, method)\u001b[0m\n\u001b[1;32m 4637\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 4638\u001b[0m arr \u001b[38;5;241m=\u001b[39m a\u001b[38;5;241m.\u001b[39mcopy()\n\u001b[0;32m-> 4639\u001b[0m result \u001b[38;5;241m=\u001b[39m _quantile(arr,\n\u001b[1;32m 4640\u001b[0m quantiles\u001b[38;5;241m=\u001b[39mq,\n\u001b[1;32m 4641\u001b[0m axis\u001b[38;5;241m=\u001b[39maxis,\n\u001b[1;32m 4642\u001b[0m method\u001b[38;5;241m=\u001b[39mmethod,\n\u001b[1;32m 4643\u001b[0m out\u001b[38;5;241m=\u001b[39mout)\n\u001b[1;32m 4644\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/numpy/lib/function_base.py:4745\u001b[0m, in \u001b[0;36m_quantile\u001b[0;34m(arr, quantiles, axis, method, out)\u001b[0m\n\u001b[1;32m 4737\u001b[0m arr\u001b[38;5;241m.\u001b[39mpartition(\n\u001b[1;32m 4738\u001b[0m np\u001b[38;5;241m.\u001b[39munique(np\u001b[38;5;241m.\u001b[39mconcatenate(([\u001b[38;5;241m0\u001b[39m, \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m1\u001b[39m],\n\u001b[1;32m 4739\u001b[0m previous_indexes\u001b[38;5;241m.\u001b[39mravel(),\n\u001b[1;32m 4740\u001b[0m next_indexes\u001b[38;5;241m.\u001b[39mravel(),\n\u001b[1;32m 4741\u001b[0m ))),\n\u001b[1;32m 4742\u001b[0m axis\u001b[38;5;241m=\u001b[39mDATA_AXIS)\n\u001b[1;32m 4743\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m np\u001b[38;5;241m.\u001b[39missubdtype(arr\u001b[38;5;241m.\u001b[39mdtype, np\u001b[38;5;241m.\u001b[39minexact):\n\u001b[1;32m 4744\u001b[0m slices_having_nans \u001b[38;5;241m=\u001b[39m np\u001b[38;5;241m.\u001b[39misnan(\n\u001b[0;32m-> 4745\u001b[0m take(arr, indices\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m-\u001b[39m\u001b[38;5;241m1\u001b[39m, axis\u001b[38;5;241m=\u001b[39mDATA_AXIS)\n\u001b[1;32m 4746\u001b[0m )\n\u001b[1;32m 4747\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 4748\u001b[0m slices_having_nans \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n", + "File \u001b[0;32m<__array_function__ internals>:200\u001b[0m, in \u001b[0;36mtake\u001b[0;34m(*args, **kwargs)\u001b[0m\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/numpy/core/fromnumeric.py:190\u001b[0m, in \u001b[0;36mtake\u001b[0;34m(a, indices, axis, out, mode)\u001b[0m\n\u001b[1;32m 93\u001b[0m \u001b[38;5;129m@array_function_dispatch\u001b[39m(_take_dispatcher)\n\u001b[1;32m 94\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mtake\u001b[39m(a, indices, axis\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mNone\u001b[39;00m, out\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mNone\u001b[39;00m, mode\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mraise\u001b[39m\u001b[38;5;124m'\u001b[39m):\n\u001b[1;32m 95\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m 96\u001b[0m \u001b[38;5;124;03m Take elements from an array along an axis.\u001b[39;00m\n\u001b[1;32m 97\u001b[0m \n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 188\u001b[0m \u001b[38;5;124;03m [5, 7]])\u001b[39;00m\n\u001b[1;32m 189\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 190\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m _wrapfunc(a, \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mtake\u001b[39m\u001b[38;5;124m'\u001b[39m, indices, axis\u001b[38;5;241m=\u001b[39maxis, out\u001b[38;5;241m=\u001b[39mout, mode\u001b[38;5;241m=\u001b[39mmode)\n", + "File \u001b[0;32m~/miniconda3/envs/colmena/lib/python3.11/site-packages/numpy/core/fromnumeric.py:57\u001b[0m, in \u001b[0;36m_wrapfunc\u001b[0;34m(obj, method, *args, **kwds)\u001b[0m\n\u001b[1;32m 54\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m _wrapit(obj, method, \u001b[38;5;241m*\u001b[39margs, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwds)\n\u001b[1;32m 56\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m---> 57\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m bound(\u001b[38;5;241m*\u001b[39margs, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwds)\n\u001b[1;32m 58\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m:\n\u001b[1;32m 59\u001b[0m \u001b[38;5;66;03m# A TypeError occurs if the object does have such a method in its\u001b[39;00m\n\u001b[1;32m 60\u001b[0m \u001b[38;5;66;03m# class, but its signature is not identical to that of NumPy's. This\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 64\u001b[0m \u001b[38;5;66;03m# Call _wrapit from within the except clause to ensure a potential\u001b[39;00m\n\u001b[1;32m 65\u001b[0m \u001b[38;5;66;03m# exception has a traceback chain.\u001b[39;00m\n\u001b[1;32m 66\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m _wrapit(obj, method, \u001b[38;5;241m*\u001b[39margs, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwds)\n", + "\u001b[0;31mIndexError\u001b[0m: cannot do a non-empty take from an empty axes." + ] } ], "source": [ - "results.query('task_length > 5')" + "results['rxn_time_compute'], results['rxn_time_data'] = zip(*results['path'].apply(get_median_reaction_time))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_median_decision_time(path: Path):\n", + " \"\"\"Measure the median time for all job submissions.\"\"\"\n", + "\n", + " decision_time = []\n", + " pat = re.compile('Finished submitting new work. Runtime: (\\d\\.\\d+e-?\\d+)s')\n", + " with path.joinpath('run.log').open() as fp:\n", + " for line in fp:\n", + " for match in pat.findall(line):\n", + " decision_time.append(float(match))\n", + " return np.percentile(decision_time, 50) " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results['decision_time'] = results['path'].apply(get_median_decision_time)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_median_dispatch_time(path: Path):\n", + " \"\"\"Measure the median dispatch time for all tasks,\n", + " by until the compute message arrives and when the data arrives\"\"\"\n", + "\n", + " # Loop over the tasks\n", + " compute = []\n", + " data = []\n", + " with path.joinpath('results.json').open() as fp:\n", + " for line in fp:\n", + " record = json.loads(line)\n", + " compute_time = (\n", + " record['timestamp']['compute_started'] \n", + " - record['timestamp']['created']\n", + " + record['time']['deserialize_inputs']\n", + " )\n", + " compute.append(compute_time)\n", + "\n", + " # Add the additional time taken for the data to be accessed\n", + " data_time = 0\n", + " for proxy, timings in record['time'].get('proxy', {}).items():\n", + " if 'store.get' in timings['times']:\n", + " data_time += timings['times']['store.get']['avg_time_ms'] / 1000\n", + " data.append(\n", + " compute_time + data_time\n", + " )\n", + "\n", + " return np.percentile(compute, 50), np.percentile(data, 50)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results['dispatch_time_compute'], results['dispatch_time_data'] = zip(*results['path'].apply(get_median_dispatch_time))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "results" ] }, { diff --git a/demo_apps/task-limits/run.py b/demo_apps/task-limits/run.py index cd692c0..17369e0 100644 --- a/demo_apps/task-limits/run.py +++ b/demo_apps/task-limits/run.py @@ -1,7 +1,7 @@ """Evaluate the effect of task duration and size on throughput""" from platform import node from datetime import datetime -from random import randbytes +from time import perf_counter from typing import TextIO import argparse import json @@ -11,7 +11,6 @@ import time import numpy as np -from proxystore.connectors.file import FileConnector from proxystore.connectors.redis import RedisConnector from proxystore.store import Store, register_store from scipy.stats import truncnorm @@ -118,7 +117,7 @@ def __init__(self, def submit(self): """Submit a new task if resources are available""" runtime, task_size = self.task_queue.pop() - input_data = randbytes(task_size) + input_data = np.empty(task_size, bool) self.queues.send_inputs( input_data, self.task_output_size, runtime, method='target_function') @@ -129,6 +128,15 @@ def submit(self): def resubmitter(self, result: Result): assert result.success, result.failure_info.traceback self.rec.release() + + # Force access to the data + read_time = perf_counter() + data_size = len(result.value) + read_time = perf_counter() - read_time + result.task_info['read_time'] = read_time + result.task_info['read_size'] = data_size + + # Store print(result.json(exclude={'inputs', 'value'}), file=self.output_file, flush=False)