781c8903738280338d5271dbd4c82eb681d42a5d
[arvados-workbench2.git] / src / store / process-logs-panel / process-logs-panel-actions.ts
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
16
17 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
18 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
19
20 export const processLogsPanelActions = unionize({
21     RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
22     INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
23     SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
24     ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
25 });
26
27 // Max size of logs to fetch in bytes
28 const maxLogFetchSize: number = 128 * 1000;
29
30 type FileWithProgress = {
31     file: CollectionFile;
32     lastByte: number;
33 }
34
35 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
36
37 export const setProcessLogsPanelFilter = (filter: string) =>
38     processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
39
40 export const initProcessLogsPanel = (processUuid: string) =>
41     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
42         try {
43             dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
44             const process = getProcess(processUuid)(getState().resources);
45             if (process?.containerRequest?.uuid) {
46                 // Get log file size info
47                 const logFiles = await loadContainerLogFileList(process.containerRequest.uuid, logService);
48
49                 // Populate lastbyte 0 for each file
50                 const filesWithProgress = logFiles.map((file) => ({file, lastByte: 0}));
51
52                 // Fetch array of LogFragments
53                 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
54
55                 // Populate initial state with filters
56                 const initialState = createInitialLogPanelState(logFiles, logLines);
57                 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
58             }
59         } catch(e) {
60             // On error, populate empty state to allow polling to start
61             const initialState = createInitialLogPanelState([], []);
62             dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
63             dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not load process logs', hideDuration: 2000, kind: SnackbarKind.ERROR }));
64         }
65     };
66
67 export const pollProcessLogs = (processUuid: string) =>
68     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
69         try {
70             // Get log panel state and process from store
71             const currentState = getState().processLogsPanel;
72             const process = getProcess(processUuid)(getState().resources);
73
74             // Check if container request is present and initial logs state loaded
75             if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
76                 const logFiles = await loadContainerLogFileList(process.containerRequest.uuid, logService);
77
78                 // Determine byte to fetch from while filtering unchanged files
79                 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
80                     // Fetch last byte or 0 for new log files
81                     const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
82
83                     const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
84                     const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
85
86                     if (isNew || isChanged) {
87                         return acc.concat({file: updatedFile, lastByte: currentStateLogLastByte});
88                     } else {
89                         return acc;
90                     }
91                 }, [] as FileWithProgress[]);
92
93                 // Perform range request(s) for each file
94                 const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
95
96                 if (logFragments.length) {
97                     // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
98                     const groupedLogs = groupLogs(logFiles, logFragments);
99                     await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
100                 }
101             }
102             return Promise.resolve();
103         } catch (e) {
104             // Remove log when polling error is handled in some way instead of being ignored
105             console.error("Error occurred in pollProcessLogs:", e);
106             return Promise.reject();
107         }
108     };
109
110 const loadContainerLogFileList = async (containerUuid: string, logService: LogService) => {
111     const logCollectionContents = await logService.listLogFiles(containerUuid);
112
113     // Filter only root directory files matching log event types which have bytes
114     return logCollectionContents.filter((file): file is CollectionFile => (
115         file.type === CollectionFileType.FILE &&
116         PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
117         file.size > 0
118     ));
119 };
120
121 /**
122  * Loads the contents of each file from each file's lastByte simultaneously
123  *   while respecting the maxLogFetchSize by requesting the start and end
124  *   of the desired block and inserting a snipline.
125  * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
126  * @param logService
127  * @param process
128  * @returns LogFragment[] containing a single LogFragment corresponding to each input file
129  */
130 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
131     (await Promise.allSettled(logFilesWithProgress.filter(({file}) => file.size > 0).map(({file, lastByte}) => {
132         const requestSize = file.size - lastByte;
133         if (requestSize > maxLogFetchSize) {
134             const chunkSize = Math.floor(maxLogFetchSize / 2);
135             const firstChunkEnd = lastByte+chunkSize-1;
136             return Promise.all([
137                 logService.getLogFileContents(process.containerRequest.uuid, file, lastByte, firstChunkEnd),
138                 logService.getLogFileContents(process.containerRequest.uuid, file, file.size-chunkSize, file.size-1)
139             ] as Promise<(LogFragment)>[]);
140         } else {
141             return Promise.all([logService.getLogFileContents(process.containerRequest.uuid, file, lastByte, file.size-1)]);
142         }
143     })).then((res) => {
144         if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) {
145             // Since allSettled does not pass promise rejection we throw an
146             //   error if every request failed
147             const error = res.find(
148                 (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected'
149               )?.reason;
150             return Promise.reject(error);
151         }
152         return res.filter((promiseResult): promiseResult is PromiseFulfilledResult<LogFragment[]> => (
153             // Filter out log files with rejected promises
154             //   (Promise.all rejects on any failure)
155             promiseResult.status === 'fulfilled' &&
156             // Filter out files where any fragment is empty
157             //   (prevent incorrect snipline generation or an un-resumable situation)
158             !!promiseResult.value.every(logFragment => logFragment.contents.length)
159         )).map(one => one.value)
160     })).map((logResponseSet)=> {
161         // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
162         //   Don't add snip line as a separate line so that sorting won't reorder it
163         for (let i = 1; i < logResponseSet.length; i++) {
164             const fragment = logResponseSet[i-1];
165             const lastLineIndex = fragment.contents.length-1;
166             const lastLineContents = fragment.contents[lastLineIndex];
167             const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
168
169             logResponseSet[i-1].contents[lastLineIndex] = newLastLine;
170         }
171
172         // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
173         return logResponseSet.reduce((acc, curr: LogFragment) => ({
174             logType: curr.logType,
175             contents: [...(acc.contents || []), ...curr.contents]
176         }), {} as LogFragment);
177     })
178 );
179
180 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): {filters: string[], logs: ProcessLogs} => {
181     const logs = groupLogs(logFiles, logFragments);
182     const filters = Object.keys(logs);
183     return { filters, logs };
184 }
185
186 /**
187  * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
188  * @param logFiles
189  * @param logFragments
190  * @returns ProcessLogs for the store
191  */
192 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
193     const sortableLogFragments = mergeMultilineLoglines(logFragments);
194
195     const allLogs = mergeSortLogFragments(sortableLogFragments);
196     const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
197
198     const groupedLogs = logFragments.reduce((grouped, fragment) => ({
199         ...grouped,
200         [fragment.logType as string]: {lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents}
201     }), {});
202
203     return {
204         [MAIN_FILTER_TYPE]: {lastByte: undefined, contents: mainLogs},
205         [ALL_FILTER_TYPE]: {lastByte: undefined, contents: allLogs},
206         ...groupedLogs,
207     }
208 };
209
210 /**
211  * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
212  *   If there is no previous line (first line has no timestamp), the line is deleted.
213  *   Only used for combined logs that need sorting by timestamp after merging
214  * @param logFragments
215  * @returns Modified LogFragment[]
216  */
217 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
218     logFragments.map((fragment) => {
219         // Avoid altering the original fragment copy
220         let fragmentCopy: LogFragment = {
221             logType: fragment.logType,
222             contents: [...fragment.contents],
223         }
224         // Merge any non-timestamped lines in sortable log types with previous line
225         if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
226             for (let i = 0; i < fragmentCopy.contents.length; i++) {
227                 const lineContents = fragmentCopy.contents[i];
228                 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
229                     // Partial line without timestamp detected
230                     if (i > 0) {
231                         // If not first line, copy line to previous line
232                         const previousLineContents = fragmentCopy.contents[i-1];
233                         const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
234                         fragmentCopy.contents[i-1] = newPreviousLineContents;
235                     }
236                     // Delete the current line and prevent iterating
237                     fragmentCopy.contents.splice(i, 1);
238                     i--;
239                 }
240             }
241         }
242         return fragmentCopy;
243     })
244 );
245
246 /**
247  * Merges log lines of different types and sorts types that contain timestamps (are sortable)
248  * @param logFragments
249  * @returns string[] of merged and sorted log lines
250  */
251 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
252     const sortableLines = fragmentsToLines(logFragments
253         .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType))));
254
255     const nonSortableLines = fragmentsToLines(logFragments
256         .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
257         .sort((a, b) => (a.logType.localeCompare(b.logType))));
258
259     return [...nonSortableLines, ...sortableLines.sort(sortLogLines)]
260 };
261
262 const sortLogLines = (a: string, b: string) => {
263     return a.localeCompare(b);
264 };
265
266 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
267     fragments.reduce((acc, fragment: LogFragment) => (
268         acc.concat(...fragment.contents)
269     ), [] as string[])
270 );
271
272 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
273     return logFiles.find((file) => (file.name.startsWith(key)))?.size
274 };
275
276 export const navigateToLogCollection = (uuid: string) =>
277     async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
278         try {
279             await services.collectionService.get(uuid);
280             dispatch<any>(navigateTo(uuid));
281         } catch {
282             dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Could not request collection', hideDuration: 2000, kind: SnackbarKind.ERROR }));
283         }
284     };
285
286 const ALL_FILTER_TYPE = 'All logs';
287
288 const MAIN_FILTER_TYPE = 'Main logs';
289 const MAIN_EVENT_TYPES = [
290     LogEventType.CRUNCH_RUN,
291     LogEventType.STDERR,
292     LogEventType.STDOUT,
293 ];
294
295 const PROCESS_PANEL_LOG_EVENT_TYPES = [
296     LogEventType.ARV_MOUNT,
297     LogEventType.CRUNCH_RUN,
298     LogEventType.CRUNCHSTAT,
299     LogEventType.DISPATCH,
300     LogEventType.HOSTSTAT,
301     LogEventType.NODE_INFO,
302     LogEventType.STDERR,
303     LogEventType.STDOUT,
304     LogEventType.CONTAINER,
305     LogEventType.KEEPSTORE,
306 ];
307
308 const NON_SORTED_LOG_TYPES = [
309     LogEventType.NODE_INFO,
310     LogEventType.CONTAINER,
311 ];